This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8e20a80 DistributionSchedule uses custom wrapped int[] rather than
HashSet
8e20a80 is described below
commit 8e20a80b3d22ac1d14ba7a8f2046bdbbe5e95a44
Author: Ivan Kelly <[email protected]>
AuthorDate: Mon Oct 30 20:47:18 2017 +0100
DistributionSchedule uses custom wrapped int[] rather than HashSet
This avoids the autoboxing on Integers and allocations of cells for
the hashset.
This also implies using wrapped int[] object for the write set in the
DistributionSchedules and PlacementPolicies.
This patch was originally submitted as dc7933b on the yahoo-4.3
branch, though this has been modified extensively to remove the
dependency on carrotsearch hppc and to allow it to work with placement
policies which didn't exist at the time of the original patch.
Author: Ivan Kelly <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #657 from ivankelly/yahoo-backports
---
.../bookie/LocalBookieEnsemblePlacementPolicy.java | 13 +-
.../client/DefaultEnsemblePlacementPolicy.java | 21 +-
.../bookkeeper/client/DistributionSchedule.java | 106 ++++++-
.../bookkeeper/client/EnsemblePlacementPolicy.java | 30 +-
.../apache/bookkeeper/client/LedgerChecker.java | 14 +-
.../org/apache/bookkeeper/client/LedgerHandle.java | 3 +-
.../org/apache/bookkeeper/client/PendingAddOp.java | 32 +-
.../apache/bookkeeper/client/PendingReadOp.java | 28 +-
.../bookkeeper/client/PendingWriteLacOp.java | 24 +-
.../client/RackawareEnsemblePlacementPolicy.java | 20 +-
.../RackawareEnsemblePlacementPolicyImpl.java | 98 ++++--
.../client/ReadLastConfirmedAndEntryOp.java | 19 +-
.../client/RegionAwareEnsemblePlacementPolicy.java | 148 +++++----
.../client/RoundRobinDistributionSchedule.java | 331 ++++++++++++++++++---
.../TopologyAwareEnsemblePlacementPolicy.java | 21 +-
.../client/RoundRobinDistributionScheduleTest.java | 41 ++-
.../bookkeeper/client/TestLedgerChecker.java | 11 +-
.../TestRackawareEnsemblePlacementPolicy.java | 184 +++++++++---
.../TestRegionAwareEnsemblePlacementPolicy.java | 153 ++++++----
19 files changed, 965 insertions(+), 332 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index fe18d5d..6bb8e5d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -29,6 +29,7 @@ import java.util.Optional;
import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -88,14 +89,18 @@ public class LocalBookieEnsemblePlacementPolicy implements
EnsemblePlacementPoli
}
@Override
- public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress>
ensemble, List<Integer> writeSet,
- Map<BookieSocketAddress, Long>
bookieFailureHistory) {
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
return null;
}
@Override
- public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress>
ensemble, List<Integer> writeSet,
- Map<BookieSocketAddress, Long>
bookieFailureHistory) {
+ public DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
return null;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 14a8a0c..ffb229d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -146,21 +146,20 @@ public class DefaultEnsemblePlacementPolicy implements
EnsemblePlacementPolicy {
}
@Override
- public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress>
ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long>
bookieFailureHistory) {
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
return writeSet;
}
@Override
- public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress>
ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long>
bookieFailureHistory) {
- List<Integer> retList = new ArrayList<Integer>(writeSet);
- if (retList.size() < ensemble.size()) {
- for (int i = 0; i < ensemble.size(); i++) {
- if (!retList.contains(i)) {
- retList.add(i);
- }
- }
- }
- return retList;
+ public DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
+ writeSet.addMissingIndices(ensemble.size());
+ return writeSet;
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index c0d78e9..6db2b6c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.client;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import java.util.List;
import java.util.Map;
/**
@@ -32,12 +31,108 @@ import java.util.Map;
* to.
*/
-interface DistributionSchedule {
+public interface DistributionSchedule {
+
+ /**
+ * A write set represents the set of bookies to which
+ * a request will be written.
+ * The set consists of a list of indices which can be
+ * used to lookup the bookie in the ensemble.
+ */
+ public interface WriteSet {
+ /**
+ * The number of indexes in the write set.
+ */
+ public int size();
+
+ /**
+ * Whether the set contains the given index.
+ */
+ public boolean contains(int i);
+
+ /**
+ * Get the index at index i.
+ */
+ public int get(int i);
+
+ /**
+ * Set the index at index i.
+ * @return the previous value at that index.
+ */
+ public int set(int i, int index);
+
+ /**
+ * Sort the indices
+ */
+ public void sort();
+
+ /**
+ * Index of a specified bookie index.
+ * -1 if not found.
+ */
+ public int indexOf(int index);
+
+ /**
+ * If we want a write set to cover all bookies in an ensemble
+ * of size X, then all of the index from 0..X must exist in the
+ * write set. This method appends those which are missing to the
+ * end of the write set.
+ */
+ public void addMissingIndices(int maxIndex);
+
+ /**
+ * Move an index from one position to another,
+ * shifting the other indices accordingly.
+ */
+ public void moveAndShift(int from, int to);
+
+ /**
+ * Recycle write set object when not in use.
+ */
+ public void recycle();
+
+ /**
+ * Make a deep copy of this write set.
+ */
+ public WriteSet copy();
+ }
+
+ public static WriteSet NULL_WRITE_SET = new WriteSet() {
+ @Override
+ public int size() { return 0; }
+ @Override
+ public boolean contains(int i) { return false; }
+ @Override
+ public int get(int i) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ @Override
+ public int set(int i, int index) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ @Override
+ public void sort() {}
+ @Override
+ public int indexOf(int index) { return -1; }
+ @Override
+ public void addMissingIndices(int maxIndex) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ @Override
+ public void moveAndShift(int from, int to) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ @Override
+ public void recycle() {}
+ @Override
+ public WriteSet copy() { return this; }
+ };
/**
* return the set of bookie indices to send the message to
*/
- public List<Integer> getWriteSet(long entryId);
+ public WriteSet getWriteSet(long entryId);
+
/**
* An ack set represents the set of bookies from which
@@ -75,6 +170,11 @@ interface DistributionSchedule {
* Used for reissuing write requests.
*/
public boolean removeBookieAndCheck(int bookie);
+
+ /**
+ * Recycle this ack set when not used anymore
+ */
+ public void recycle();
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 8a9b86a..2301a95 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -294,16 +294,19 @@ public interface EnsemblePlacementPolicy {
*
* @param ensemble
* Ensemble to read entries.
- * @param writeSet
- * Write quorum to read entries.
* @param bookieFailureHistory
* Observed failures on the bookies
- * @return read sequence of bookies
+ * @param writeSet
+ * Write quorum to read entries. This will be modified, rather
than
+ * allocating a new WriteSet.
+ * @return The read sequence. This will be the same object as the passed in
+ * writeSet.
* @since 4.5
*/
- public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress>
ensemble,
- List<Integer> writeSet,
- Map<BookieSocketAddress, Long>
bookieFailureHistory);
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet);
/**
@@ -311,16 +314,19 @@ public interface EnsemblePlacementPolicy {
*
* @param ensemble
* Ensemble to read entries.
- * @param writeSet
- * Write quorum to read entries.
* @param bookieFailureHistory
* Observed failures on the bookies
- * @return read sequence of bookies
+ * @param writeSet
+ * Write quorum to read entries. This will be modified, rather
than
+ * allocating a new WriteSet.
+ * @return The read sequence. This will be the same object as the passed in
+ * writeSet.
* @since 4.5
*/
- public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress>
ensemble,
- List<Integer> writeSet,
- Map<BookieSocketAddress, Long>
bookieFailureHistory);
+ public DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet);
/**
* Send the bookie info details.
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 2ad8076..0eebf3e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -266,7 +266,7 @@ public class LedgerChecker {
* Check that all the fragments in the passed in ledger, and report those
* which are missing.
*/
- public void checkLedger(LedgerHandle lh,
+ public void checkLedger(final LedgerHandle lh,
final GenericCallback<Set<LedgerFragment>> cb) {
// build a set of all fragment replicas
final Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
@@ -322,7 +322,7 @@ public class LedgerChecker {
if (curEntryId == lastEntry) {
final long entryToRead = curEntryId;
- EntryExistsCallback eecb
+ final EntryExistsCallback eecb
= new
EntryExistsCallback(lh.getLedgerMetadata().getWriteQuorumSize(),
new GenericCallback<Boolean>() {
public void
operationComplete(int rc, Boolean result) {
@@ -333,11 +333,13 @@ public class LedgerChecker {
}
});
- for (int bi :
lh.getDistributionSchedule().getWriteSet(entryToRead)) {
- BookieSocketAddress addr = curEnsemble.get(bi);
- bookieClient.readEntry(addr, lh.getId(),
- entryToRead, eecb, null);
+ DistributionSchedule.WriteSet writeSet
+ = lh.getDistributionSchedule().getWriteSet(entryToRead);
+ for (int i = 0; i < writeSet.size(); i++) {
+ BookieSocketAddress addr =
curEnsemble.get(writeSet.get(i));
+ bookieClient.readEntry(addr, lh.getId(), entryToRead,
eecb, null);
}
+ writeSet.recycle();
return;
} else {
fragments.add(lastLedgerFragment);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index bb7c954..c61c85b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -817,8 +817,7 @@ public class LedgerHandle implements WriteHandle {
+") or length("+length+")");
}
- PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx);
- doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb,
ctx);
+ asyncAddEntry(Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
}
public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object
ctx) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index af4f35e..00a36b3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -22,9 +22,8 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
-import java.util.HashSet;
+import java.util.Arrays;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
@@ -35,6 +34,7 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.RejectedExecutionException;
@@ -56,7 +56,6 @@ class PendingAddOp implements WriteCallback, TimerTask {
Object ctx;
long entryId;
int entryLength;
- Set<Integer> writeSet;
DistributionSchedule.AckSet ackSet;
boolean completed = false;
@@ -76,6 +75,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
this.cb = cb;
this.ctx = ctx;
this.entryId = LedgerHandle.INVALID_ENTRY_ID;
+
this.ackSet = lh.distributionSchedule.getAckSet();
this.addOpLogger = lh.bk.getAddOpLogger();
this.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
@@ -92,7 +92,6 @@ class PendingAddOp implements WriteCallback, TimerTask {
void setEntryId(long entryId) {
this.entryId = entryId;
- writeSet = new
HashSet<Integer>(lh.distributionSchedule.getWriteSet(entryId));
}
long getEntryId() {
@@ -152,9 +151,15 @@ class PendingAddOp implements WriteCallback, TimerTask {
// completes.
//
// We call sendAddSuccessCallback when unsetting t cover this case.
- if (!writeSet.contains(bookieIndex)) {
- lh.sendAddSuccessCallbacks();
- return;
+ DistributionSchedule.WriteSet writeSet
+ = lh.distributionSchedule.getWriteSet(entryId);
+ try {
+ if (!writeSet.contains(bookieIndex)) {
+ lh.sendAddSuccessCallbacks();
+ return;
+ }
+ } finally {
+ writeSet.recycle();
}
if (callbackTriggered) {
@@ -192,8 +197,16 @@ class PendingAddOp implements WriteCallback, TimerTask {
// Retain the buffer until all writes are complete
this.toSend.retain();
this.entryLength = entryLength;
- for (int bookieIndex : writeSet) {
- sendWriteRequest(bookieIndex);
+
+ // Iterate over set and trigger the sendWriteRequests
+ DistributionSchedule.WriteSet writeSet
+ = lh.distributionSchedule.getWriteSet(entryId);
+ try {
+ for (int i = 0; i < writeSet.size(); i++) {
+ sendWriteRequest(writeSet.get(i));
+ }
+ } finally {
+ writeSet.recycle();
}
}
@@ -279,6 +292,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
if (ackQuorum && !completed) {
completed = true;
+ ackSet.recycle();
sendAddSuccessCallbacks();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 6b62c0f..40da31c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Enumeration;
import java.util.HashSet;
-import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
@@ -87,7 +86,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
int numMissedEntryReads = 0;
final ArrayList<BookieSocketAddress> ensemble;
- final List<Integer> writeSet;
+ final DistributionSchedule.WriteSet writeSet;
LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId,
long eId) {
super(lId, eId);
@@ -95,10 +94,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
this.ensemble = ensemble;
if (lh.bk.isReorderReadSequence()) {
- this.writeSet =
lh.bk.getPlacementPolicy().reorderReadSequence(ensemble,
- lh.distributionSchedule.getWriteSet(entryId),
lh.bookieFailureHistory.asMap());
+ writeSet = lh.bk.getPlacementPolicy()
+ .reorderReadSequence(
+ ensemble,
+ lh.bookieFailureHistory.asMap(),
+ lh.distributionSchedule.getWriteSet(entryId));
} else {
- this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+ writeSet = lh.distributionSchedule.getWriteSet(entryId);
}
}
@@ -137,6 +139,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
*/
length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
data = content;
+ writeSet.recycle();
return true;
} else {
buffer.release();
@@ -155,6 +158,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
if (complete.compareAndSet(false, true)) {
this.rc = rc;
submitCallback(rc);
+ writeSet.recycle();
return true;
} else {
return false;
@@ -269,10 +273,10 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
@Override
void read() {
- for (int bookieIndex : writeSet) {
- BookieSocketAddress to = ensemble.get(bookieIndex);
+ for (int i = 0; i < writeSet.size(); i++) {
+ BookieSocketAddress to = ensemble.get(writeSet.get(i));
try {
- sendReadTo(bookieIndex, to, this);
+ sendReadTo(writeSet.get(i), to, this);
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry {} : ", this, ie);
Thread.currentThread().interrupt();
@@ -322,10 +326,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
return nextReplicaIndexToReadFrom;
}
- private int getReplicaIndex(int bookieIndex) {
- return writeSet.indexOf(bookieIndex);
- }
-
private BitSet getSentToBitSet() {
BitSet b = new BitSet(ensemble.size());
@@ -386,7 +386,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
}
int replica = nextReplicaIndexToReadFrom;
- int bookieIndex =
lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
+ int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom);
nextReplicaIndexToReadFrom++;
try {
@@ -406,7 +406,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
synchronized void logErrorAndReattemptRead(int bookieIndex,
BookieSocketAddress host, String errMsg, int rc) {
super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
- int replica = getReplicaIndex(bookieIndex);
+ int replica = writeSet.indexOf(bookieIndex);
if (replica == NOT_FOUND) {
LOG.error("Received error from a host which is not in the
ensemble {} {}.", host, ensemble);
return;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index 755f93d..3ab5c7b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -17,7 +17,7 @@
*/
package org.apache.bookkeeper.client;
-
+import java.util.BitSet;
import java.util.HashSet;
import java.util.Set;
@@ -45,8 +45,7 @@ class PendingWriteLacOp implements WriteLacCallback {
AddLacCallback cb;
long lac;
Object ctx;
- Set<Integer> writeSet;
- Set<Integer> receivedResponseSet;
+ BitSet receivedResponseSet;
DistributionSchedule.AckSet ackSet;
boolean completed = false;
@@ -66,8 +65,11 @@ class PendingWriteLacOp implements WriteLacCallback {
void setLac(long lac) {
this.lac = lac;
- this.writeSet = new
HashSet<Integer>(lh.distributionSchedule.getWriteSet(lac));
- this.receivedResponseSet = new HashSet<Integer>(writeSet);
+
+ this.receivedResponseSet = new BitSet(
+ lh.getLedgerMetadata().getWriteQuorumSize());
+ this.receivedResponseSet.set(0,
+ lh.getLedgerMetadata().getWriteQuorumSize());
}
void sendWriteLacRequest(int bookieIndex) {
@@ -77,8 +79,14 @@ class PendingWriteLacOp implements WriteLacCallback {
void initiate(ByteBuf toSend) {
this.toSend = toSend;
- for (int bookieIndex: writeSet) {
- sendWriteLacRequest(bookieIndex);
+ DistributionSchedule.WriteSet writeSet
+ = lh.distributionSchedule.getWriteSet(lac);
+ try {
+ for (int i = 0; i < writeSet.size(); i++) {
+ sendWriteLacRequest(writeSet.get(i));
+ }
+ } finally {
+ writeSet.recycle();
}
}
@@ -95,7 +103,7 @@ class PendingWriteLacOp implements WriteLacCallback {
}
// We got response.
- receivedResponseSet.remove(bookieIndex);
+ receivedResponseSet.clear(bookieIndex);
if (rc == BKException.Code.OK) {
if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 8126b96..b3b633f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -118,17 +118,21 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
}
@Override
- public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress>
ensemble,
- List<Integer> writeSet,
- Map<BookieSocketAddress, Long>
bookieFailureHistory) {
- return super.reorderReadSequence(ensemble, writeSet,
bookieFailureHistory);
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
+ return super.reorderReadSequence(ensemble, bookieFailureHistory,
+ writeSet);
}
@Override
- public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress>
ensemble,
- List<Integer> writeSet,
- Map<BookieSocketAddress, Long>
bookieFailureHistory) {
- return super.reorderReadLACSequence(ensemble, writeSet,
bookieFailureHistory);
+ public DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
+ return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
+ writeSet);
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 527f180..b08ce43 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -28,6 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Preconditions;
@@ -71,6 +73,15 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
static final int RACKNAME_DISTANCE_FROM_LEAVES = 1;
+ // masks for reordering
+ static final int LOCAL_MASK = 0x01 << 24;
+ static final int LOCAL_FAIL_MASK = 0x02 << 24;
+ static final int REMOTE_MASK = 0x04 << 24;
+ static final int REMOTE_FAIL_MASK = 0x08 << 24;
+ static final int READ_ONLY_MASK = 0x10 << 24;
+ static final int UNAVAIL_MASK = 0x20 << 24;
+ static final int MASK_BITS = 0xFFF << 20;
+
static class DefaultResolver implements DNSToSwitchMapping {
final Supplier<String> defaultRackSupplier;
@@ -769,47 +780,86 @@ class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsemblePlacemen
}
@Override
- public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress>
ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long>
bookieFailureHistory) {
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
int ensembleSize = ensemble.size();
- List<Integer> finalList = new ArrayList<Integer>(writeSet.size());
- List<Long> observedFailuresList = new ArrayList<Long>(writeSet.size());
- List<Integer> readOnlyList = new ArrayList<Integer>(writeSet.size());
- List<Integer> unAvailableList = new
ArrayList<Integer>(writeSet.size());
- for (Integer idx : writeSet) {
+
+ for (int i = 0; i < writeSet.size(); i++) {
+ int idx = writeSet.get(i);
BookieSocketAddress address = ensemble.get(idx);
Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
if (null == knownBookies.get(address)) {
- // there isn't too much differences between readonly bookies
from unavailable bookies. since there
- // is no write requests to them, so we shouldn't try reading
from readonly bookie in prior to writable
+ // there isn't too much differences between readonly bookies
+ // from unavailable bookies. since there
+ // is no write requests to them, so we shouldn't try reading
+ // from readonly bookie in prior to writable
// bookies.
- if ((null == readOnlyBookies) ||
!readOnlyBookies.contains(address)) {
- unAvailableList.add(idx);
+ if ((null == readOnlyBookies)
+ || !readOnlyBookies.contains(address)) {
+ writeSet.set(i, idx | UNAVAIL_MASK);
} else {
- readOnlyList.add(idx);
+ writeSet.set(i, idx | READ_ONLY_MASK);
}
} else {
- if ((lastFailedEntryOnBookie == null) ||
(lastFailedEntryOnBookie < 0)) {
- finalList.add(idx);
+ if ((lastFailedEntryOnBookie == null)
+ || (lastFailedEntryOnBookie < 0)) {
+ writeSet.set(i, idx | LOCAL_MASK);
} else {
- observedFailuresList.add(lastFailedEntryOnBookie *
ensembleSize + idx);
+ long failIdx = lastFailedEntryOnBookie * ensembleSize +
idx;
+ writeSet.set(i,
+ (int)(failIdx & ~MASK_BITS) |
LOCAL_FAIL_MASK);
}
}
}
- if (reorderReadsRandom) {
- Collections.shuffle(finalList);
- Collections.shuffle(readOnlyList);
- Collections.shuffle(unAvailableList);
+ // Add a mask to ensure the sort is stable, sort,
+ // and then remove mask. This maintains stability as
+ // long as there are fewer than 16 bookies in the write set.
+ for (int i = 0; i < writeSet.size(); i++) {
+ writeSet.set(i, writeSet.get(i) | ((i & 0xF) << 20));
+ }
+ writeSet.sort();
+ for (int i = 0; i < writeSet.size(); i++) {
+ writeSet.set(i, writeSet.get(i) & ~((0xF) << 20));
}
- Collections.sort(observedFailuresList);
+ if (reorderReadsRandom) {
+ shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
+ shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
+ shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
+ }
- for(long value: observedFailuresList) {
- finalList.add((int)(value % ensembleSize));
+ // remove all masks
+ for (int i = 0; i < writeSet.size(); i++) {
+ writeSet.set(i, (writeSet.get(i) & ~MASK_BITS) % ensembleSize);
}
- finalList.addAll(readOnlyList);
- finalList.addAll(unAvailableList);
- return finalList;
+ return writeSet;
+ }
+
+ /**
+ * Shuffle all the entries of an array that matches a mask.
+ * It assumes all entries with the same mask are contiguous in the array.
+ */
+ static void shuffleWithMask(DistributionSchedule.WriteSet writeSet,
+ int mask, int bits) {
+ int first = -1;
+ int last = -1;
+ for (int i = 0; i < writeSet.size(); i++) {
+ if ((writeSet.get(i) & bits) == mask) {
+ if (first == -1) {
+ first = i;
+ }
+ last = i;
+ }
+ }
+ if (first != -1) {
+ for (int i = last + 1; i > first; i--) {
+ int swapWith = ThreadLocalRandom.current().nextInt(i);
+ writeSet.set(swapWith, writeSet.set(i,
writeSet.get(swapWith)));
+ }
+ }
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 05434d1..ae721d5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,8 +74,8 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
int numMissedEntryReads = 0;
final ArrayList<BookieSocketAddress> ensemble;
- final List<Integer> writeSet;
- final List<Integer> orderedEnsemble;
+ final DistributionSchedule.WriteSet writeSet;
+ final DistributionSchedule.WriteSet orderedEnsemble;
ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long
lId, long eId) {
super(lId, eId);
@@ -83,9 +84,9 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
if (lh.bk.reorderReadSequence) {
this.orderedEnsemble =
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
- writeSet, lh.bookieFailureHistory.asMap());
+ lh.bookieFailureHistory.asMap(), writeSet.copy());
} else {
- this.orderedEnsemble = writeSet;
+ this.orderedEnsemble = writeSet.copy();
}
}
@@ -121,6 +122,8 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
}
if (!complete.getAndSet(true)) {
+ writeSet.recycle();
+ orderedEnsemble.recycle();
rc = BKException.Code.OK;
this.entryId = entryId;
/*
@@ -144,6 +147,8 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
*/
boolean fail(int rc) {
if (complete.compareAndSet(false, true)) {
+ writeSet.recycle();
+ orderedEnsemble.recycle();
this.rc = rc;
translateAndSetFirstError(rc);
completeRequest();
@@ -244,10 +249,10 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
@Override
void read() {
- for (int bookieIndex : orderedEnsemble) {
- BookieSocketAddress to = ensemble.get(bookieIndex);
+ for (int i = 0; i < orderedEnsemble.size(); i++) {
+ BookieSocketAddress to = ensemble.get(orderedEnsemble.get(i));
try {
- sendReadTo(bookieIndex, to, this);
+ sendReadTo(orderedEnsemble.get(i), to, this);
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry {} : ", this, ie);
Thread.currentThread().interrupt();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 7d4d160..8342a6b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.client;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -500,107 +501,130 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
}
@Override
- public final List<Integer>
reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer>
writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ public final DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
if (UNKNOWN_REGION.equals(myRegion)) {
- return super.reorderReadSequence(ensemble, writeSet,
bookieFailureHistory);
+ return super.reorderReadSequence(ensemble, bookieFailureHistory,
+ writeSet);
} else {
int ensembleSize = ensemble.size();
- List<Integer> finalList = new ArrayList<Integer>(writeSet.size());
- List<Integer> localList = new ArrayList<Integer>(writeSet.size());
- List<Long> localFailures = new ArrayList<Long>(writeSet.size());
- List<Integer> remoteList = new ArrayList<Integer>(writeSet.size());
- List<Long> remoteFailures = new ArrayList<Long>(writeSet.size());
- List<Integer> readOnlyList = new
ArrayList<Integer>(writeSet.size());
- List<Integer> unAvailableList = new
ArrayList<Integer>(writeSet.size());
- for (Integer idx : writeSet) {
+
+ for (int i = 0; i < writeSet.size(); i++) {
+ int idx = writeSet.get(i);
BookieSocketAddress address = ensemble.get(idx);
String region = getRegion(address);
Long lastFailedEntryOnBookie =
bookieFailureHistory.get(address);
if (null == knownBookies.get(address)) {
- // there isn't too much differences between readonly
bookies from unavailable bookies. since there
- // is no write requests to them, so we shouldn't try
reading from readonly bookie in prior to writable
- // bookies.
- if ((null == readOnlyBookies) ||
!readOnlyBookies.contains(address)) {
- unAvailableList.add(idx);
+ // there isn't too much differences between readonly
bookies
+ // from unavailable bookies. since there
+ // is no write requests to them, so we shouldn't try
reading
+ // from readonly bookie in prior to writable bookies.
+ if ((null == readOnlyBookies)
+ || !readOnlyBookies.contains(address)) {
+ writeSet.set(i, idx | UNAVAIL_MASK);
} else {
- readOnlyList.add(idx);
+ writeSet.set(i, idx | READ_ONLY_MASK);
}
} else if (region.equals(myRegion)) {
- if ((lastFailedEntryOnBookie == null) ||
(lastFailedEntryOnBookie < 0)) {
- localList.add(idx);
+ if ((lastFailedEntryOnBookie == null)
+ || (lastFailedEntryOnBookie < 0)) {
+ writeSet.set(i, idx | LOCAL_MASK);
} else {
- localFailures.add(lastFailedEntryOnBookie *
ensembleSize + idx);
+ long failIdx
+ = lastFailedEntryOnBookie * ensembleSize + idx;
+ writeSet.set(i, (int)(failIdx & ~MASK_BITS)
+ | LOCAL_FAIL_MASK);
}
} else {
- if ((lastFailedEntryOnBookie == null) ||
(lastFailedEntryOnBookie < 0)) {
- remoteList.add(idx);
+ if ((lastFailedEntryOnBookie == null)
+ || (lastFailedEntryOnBookie < 0)) {
+ writeSet.set(i, idx | REMOTE_MASK);
} else {
- remoteFailures.add(lastFailedEntryOnBookie *
ensembleSize + idx);
+ long failIdx
+ = lastFailedEntryOnBookie * ensembleSize + idx;
+ writeSet.set(i, (int)(failIdx & ~MASK_BITS)
+ | REMOTE_FAIL_MASK);
}
}
}
- // Given that idx is less than ensemble size the order of the
elements in these two lists
- // is determined by the lastFailedEntryOnBookie
- Collections.sort(localFailures);
- Collections.sort(remoteFailures);
+ // Add a mask to ensure the sort is stable, sort,
+ // and then remove mask. This maintains stability as
+ // long as there are fewer than 16 bookies in the write set.
+ for (int i = 0; i < writeSet.size(); i++) {
+ writeSet.set(i, writeSet.get(i) | ((i & 0xF) << 20));
+ }
+ writeSet.sort();
+ for (int i = 0; i < writeSet.size(); i++) {
+ writeSet.set(i, writeSet.get(i) & ~((0xF) << 20));
+ }
if (reorderReadsRandom) {
- Collections.shuffle(localList);
- Collections.shuffle(remoteList);
- Collections.shuffle(readOnlyList);
- Collections.shuffle(unAvailableList);
+ shuffleWithMask(writeSet, LOCAL_MASK, MASK_BITS);
+ shuffleWithMask(writeSet, REMOTE_MASK, MASK_BITS);
+ shuffleWithMask(writeSet, READ_ONLY_MASK, MASK_BITS);
+ shuffleWithMask(writeSet, UNAVAIL_MASK, MASK_BITS);
}
// nodes within a region are ordered as follows
// (Random?) list of nodes that have no history of failure
// Nodes with Failure history are ordered in the reverse
// order of the most recent entry that generated an error
- for(long value: localFailures) {
- localList.add((int)(value % ensembleSize));
- }
-
- for(long value: remoteFailures) {
- remoteList.add((int)(value % ensembleSize));
+ // The sort will have put them in correct order,
+ // so remove the bits that sort by age.
+ for (int i = 0; i < writeSet.size(); i++) {
+ int mask = writeSet.get(i) & MASK_BITS;
+ int idx = (writeSet.get(i) & ~MASK_BITS) % ensembleSize;
+ if (mask == LOCAL_FAIL_MASK) {
+ writeSet.set(i, LOCAL_MASK | idx);
+ } else if (mask == REMOTE_FAIL_MASK) {
+ writeSet.set(i, REMOTE_MASK | idx);
+ }
}
- // Insert a node from the remote region at the specified location
so we
- // try more than one region within the max allowed latency
- for (int i = 0; i < REMOTE_NODE_IN_REORDER_SEQUENCE; i++) {
- if (localList.size() > 0) {
- finalList.add(localList.remove(0));
- } else {
+ // Insert a node from the remote region at the specified location
so
+ // we try more than one region within the max allowed latency
+ int firstRemote = -1;
+ for (int i = 0; i < writeSet.size(); i++) {
+ if ((writeSet.get(i) & MASK_BITS) == REMOTE_MASK) {
+ firstRemote = i;
break;
}
}
-
- if (remoteList.size() > 0) {
- finalList.add(remoteList.remove(0));
+ if (firstRemote != -1) {
+ int i = 0;
+ for (;i < REMOTE_NODE_IN_REORDER_SEQUENCE
+ && i < writeSet.size(); i++) {
+ if ((writeSet.get(i) & MASK_BITS) != LOCAL_MASK) {
+ break;
+ }
+ }
+ writeSet.moveAndShift(firstRemote, i);
}
- // Add all the local nodes
- finalList.addAll(localList);
- finalList.addAll(remoteList);
- finalList.addAll(readOnlyList);
- finalList.addAll(unAvailableList);
- return finalList;
+
+ // remove all masks
+ for (int i = 0; i < writeSet.size(); i++) {
+ writeSet.set(i, writeSet.get(i) & ~MASK_BITS);
+ }
+ return writeSet;
}
}
@Override
- public final List<Integer>
reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer>
writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ public final DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
if (UNKNOWN_REGION.equals(myRegion)) {
- return super.reorderReadLACSequence(ensemble, writeSet,
bookieFailureHistory);
- }
- List<Integer> finalList = reorderReadSequence(ensemble, writeSet,
bookieFailureHistory);
-
- if (finalList.size() < ensemble.size()) {
- for (int i = 0; i < ensemble.size(); i++) {
- if (!finalList.contains(i)) {
- finalList.add(i);
- }
- }
+ return super.reorderReadLACSequence(ensemble, bookieFailureHistory,
+ writeSet);
}
+ DistributionSchedule.WriteSet finalList
+ = reorderReadSequence(ensemble, bookieFailureHistory, writeSet);
+ finalList.addMissingIndices(ensemble.size());
return finalList;
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index 4eaf37b..ca7ca48 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -19,12 +19,18 @@ package org.apache.bookkeeper.client;
import com.google.common.collect.ImmutableMap;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.commons.lang3.ArrayUtils;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.BitSet;
import java.util.Map;
+import java.util.Arrays;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
/**
* A specific {@link DistributionSchedule} that places entries in round-robin
@@ -34,10 +40,9 @@ import java.util.Map;
*
*/
class RoundRobinDistributionSchedule implements DistributionSchedule {
- private int writeQuorumSize;
- private int ackQuorumSize;
- private int ensembleSize;
-
+ private final int writeQuorumSize;
+ private final int ackQuorumSize;
+ private final int ensembleSize;
public RoundRobinDistributionSchedule(int writeQuorumSize, int
ackQuorumSize, int ensembleSize) {
this.writeQuorumSize = writeQuorumSize;
@@ -46,44 +51,295 @@ class RoundRobinDistributionSchedule implements
DistributionSchedule {
}
@Override
- public List<Integer> getWriteSet(long entryId) {
- List<Integer> set = new ArrayList<Integer>();
- for (int i = 0; i < this.writeQuorumSize; i++) {
- set.add((int)((entryId + i) % ensembleSize));
+ public WriteSet getWriteSet(long entryId) {
+ return WriteSetImpl.create(ensembleSize, writeQuorumSize, entryId);
+ }
+
+ @VisibleForTesting
+ static WriteSet writeSetFromValues(Integer... values) {
+ WriteSetImpl writeSet = WriteSetImpl.create(0, 0, 0);
+ writeSet.setSize(values.length);
+ for (int i = 0; i < values.length; i++) {
+ writeSet.set(i, values[i]);
+ }
+ return writeSet;
+ }
+
+ private static class WriteSetImpl implements WriteSet {
+ int[] array = null;
+ int size;
+
+ private final Handle<WriteSetImpl> recyclerHandle;
+ private static final Recycler<WriteSetImpl> RECYCLER
+ = new Recycler<WriteSetImpl>() {
+ protected WriteSetImpl newObject(
+ Recycler.Handle<WriteSetImpl> handle) {
+ return new WriteSetImpl(handle);
+ }
+ };
+
+ private WriteSetImpl(Handle<WriteSetImpl> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ static WriteSetImpl create(int ensembleSize,
+ int writeQuorumSize,
+ long entryId) {
+ WriteSetImpl writeSet = RECYCLER.get();
+ writeSet.reset(ensembleSize, writeQuorumSize, entryId);
+ return writeSet;
+ }
+
+ private void reset(int ensembleSize, int writeQuorumSize,
+ long entryId) {
+ setSize(writeQuorumSize);
+ for (int w = 0; w < writeQuorumSize; w++) {
+ set(w, (int)((entryId + w) % ensembleSize));
+ }
+ }
+
+ private void setSize(int newSize) {
+ if (array == null) {
+ array = new int[newSize];
+ } else if (newSize > array.length) {
+ int[] newArray = new int[newSize];
+ System.arraycopy(array, 0,
+ newArray, 0, array.length);
+ array = newArray;
+ }
+ size = newSize;
+ }
+
+ @Override
+ public int size() { return size; }
+
+ @Override
+ public boolean contains(int i) {
+ return indexOf(i) != -1;
+ }
+
+ @Override
+ public int get(int i) {
+ checkBounds(i);
+ return array[i];
+ }
+
+ @Override
+ public int set(int i, int index) {
+ checkBounds(i);
+ int oldVal = array[i];
+ array[i] = index;
+ return oldVal;
+ }
+
+ @Override
+ public void sort() {
+ Arrays.sort(array, 0, size);
+ }
+
+ @Override
+ public int indexOf(int index) {
+ for (int j = 0; j < size; j++) {
+ if (array[j] == index) {
+ return j;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public void addMissingIndices(int maxIndex) {
+ if (size < maxIndex) {
+ int oldSize = size;
+ setSize(maxIndex);
+ for (int i = 0, j = oldSize;
+ i < maxIndex && j < maxIndex; i++) {
+ if (!contains(i)) {
+ set(j, i);
+ j++;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void moveAndShift(int from, int to) {
+ checkBounds(from);
+ checkBounds(to);
+ if (from > to) {
+ int tmp = array[from];
+ for (int i = from; i > to; i--) {
+ array[i] = array[i-1];
+ }
+ array[to] = tmp;
+ } else if (from < to) {
+ int tmp = array[from];
+ for (int i = from; i < to; i++) {
+ array[i] = array[i+1];
+ }
+ array[to] = tmp;
+ }
+ }
+
+ @Override
+ public void recycle() {
+ recyclerHandle.recycle(this);
+ }
+
+ @Override
+ public WriteSet copy() {
+ WriteSetImpl copy = RECYCLER.get();
+ copy.setSize(size);
+ for (int i = 0; i < size; i++) {
+ copy.set(i, array[i]);
+ }
+ return copy;
+ }
+
+ @Override
+ public int hashCode() {
+ int sum = 0;
+ for (int i = 0; i < size; i++) {
+ sum += sum * 31 + i;
+ }
+ return sum;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof WriteSetImpl) {
+ WriteSetImpl o = (WriteSetImpl)other;
+ if (o.size() != size()) {
+ return false;
+ }
+ for (int i = 0; i < size(); i++) {
+ if (o.get(i) != get(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder b = new StringBuilder("WriteSet[");
+ int i = 0;
+ for (; i < size() - 1; i++) {
+ b.append(get(i)).append(",");
+ }
+ b.append(get(i)).append("]");
+ return b.toString();
+ }
+
+ private void checkBounds(int i) {
+ if (i < 0 || i > size) {
+ throw new IndexOutOfBoundsException(
+ "Index " + i + " out of bounds, array size = " + size);
+ }
}
- return set;
}
@Override
public AckSet getAckSet() {
- final HashSet<Integer> ackSet = new HashSet<Integer>();
- final HashMap<Integer, BookieSocketAddress> failureMap =
- new HashMap<Integer, BookieSocketAddress>();
- return new AckSet() {
- public boolean completeBookieAndCheck(int bookieIndexHeardFrom) {
- failureMap.remove(bookieIndexHeardFrom);
- ackSet.add(bookieIndexHeardFrom);
- return ackSet.size() >= ackQuorumSize;
+ return AckSetImpl.create(ensembleSize, writeQuorumSize, ackQuorumSize);
+ }
+
+ private static class AckSetImpl implements AckSet {
+ private int writeQuorumSize;
+ private int ackQuorumSize;
+ private final BitSet ackSet = new BitSet();
+ // grows on reset()
+ private BookieSocketAddress[] failureMap = new BookieSocketAddress[0];
+
+ private final Handle<AckSetImpl> recyclerHandle;
+ private static final Recycler<AckSetImpl> RECYCLER = new
Recycler<AckSetImpl>() {
+ protected AckSetImpl newObject(Recycler.Handle<AckSetImpl> handle)
{
+ return new AckSetImpl(handle);
}
+ };
- @Override
- public boolean failBookieAndCheck(int bookieIndexHeardFrom,
BookieSocketAddress address) {
- ackSet.remove(bookieIndexHeardFrom);
- failureMap.put(bookieIndexHeardFrom, address);
- return failureMap.size() > (writeQuorumSize - ackQuorumSize);
+ private AckSetImpl(Handle<AckSetImpl> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ static AckSetImpl create(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize) {
+ AckSetImpl ackSet = RECYCLER.get();
+ ackSet.reset(ensembleSize, writeQuorumSize, ackQuorumSize);
+ return ackSet;
+ }
+
+ private void reset(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize) {
+ this.ackQuorumSize = ackQuorumSize;
+ this.writeQuorumSize = writeQuorumSize;
+ ackSet.clear();
+ if (failureMap.length < ensembleSize) {
+ failureMap = new BookieSocketAddress[ensembleSize];
}
+ Arrays.fill(failureMap, null);
+ }
+
+ @Override
+ public boolean completeBookieAndCheck(int bookieIndexHeardFrom) {
+ failureMap[bookieIndexHeardFrom] = null;
+ ackSet.set(bookieIndexHeardFrom);
+ return ackSet.cardinality() >= ackQuorumSize;
+ }
+
+ @Override
+ public boolean failBookieAndCheck(int bookieIndexHeardFrom,
+ BookieSocketAddress address) {
+ ackSet.clear(bookieIndexHeardFrom);
+ failureMap[bookieIndexHeardFrom] = address;
+ return failed() > (writeQuorumSize - ackQuorumSize);
+ }
- @Override
- public Map<Integer, BookieSocketAddress> getFailedBookies() {
- return ImmutableMap.copyOf(failureMap);
+ @Override
+ public Map<Integer, BookieSocketAddress> getFailedBookies() {
+ ImmutableMap.Builder<Integer, BookieSocketAddress> builder
+ = new ImmutableMap.Builder<>();
+ for (int i = 0; i < failureMap.length; i++) {
+ if (failureMap[i] != null) {
+ builder.put(i, failureMap[i]);
+ }
}
+ return builder.build();
+ }
- public boolean removeBookieAndCheck(int bookie) {
- ackSet.remove(bookie);
- failureMap.remove(bookie);
- return ackSet.size() >= ackQuorumSize;
+ @Override
+ public boolean removeBookieAndCheck(int bookie) {
+ ackSet.clear(bookie);
+ failureMap[bookie] = null;
+ return ackSet.cardinality() >= ackQuorumSize;
+ }
+
+ @Override
+ public void recycle() {
+ recyclerHandle.recycle(this);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("ackQuorumSize", ackQuorumSize)
+ .add("ackSet", ackSet)
+ .add("failureMap", failureMap).toString();
+ }
+
+ private int failed() {
+ int count = 0;
+ for (int i = 0; i < failureMap.length; i++) {
+ if (failureMap[i] != null) {
+ count++;
+ }
}
- };
+ return count;
+ }
}
private class RRQuorumCoverageSet implements QuorumCoverageSet {
@@ -136,6 +392,11 @@ class RoundRobinDistributionSchedule implements
DistributionSchedule {
@Override
public boolean hasEntry(long entryId, int bookieIndex) {
- return getWriteSet(entryId).contains(bookieIndex);
+ WriteSet w = getWriteSet(entryId);
+ try {
+ return w.contains(bookieIndex);
+ } finally {
+ w.recycle();
+ }
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index f72b456..c99b4fc 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -452,20 +452,21 @@ abstract class TopologyAwareEnsemblePlacementPolicy
implements ITopologyAwareEns
}
@Override
- public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress>
ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long>
bookieFailureHistory) {
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
return writeSet;
}
@Override
- public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress>
ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long>
bookieFailureHistory) {
- List<Integer> retList = new
ArrayList<Integer>(reorderReadSequence(ensemble, writeSet,
bookieFailureHistory));
- if (retList.size() < ensemble.size()) {
- for (int i = 0; i < ensemble.size(); i++) {
- if (!retList.contains(i)) {
- retList.add(i);
- }
- }
- }
+ public DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<BookieSocketAddress, Long> bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
+ DistributionSchedule.WriteSet retList = reorderReadSequence(
+ ensemble, bookieFailureHistory, writeSet);
+ retList.addMissingIndices(ensemble.size());
return retList;
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
index c4cb49a..a4ef158 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
@@ -21,12 +21,13 @@
package org.apache.bookkeeper.client;
-import java.util.List;
import java.util.Set;
import java.util.HashSet;
+
import com.google.common.collect.Sets;
import org.junit.Test;
+import static
org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static org.junit.Assert.*;
import org.slf4j.Logger;
@@ -39,14 +40,17 @@ public class RoundRobinDistributionScheduleTest {
public void testDistributionSchedule() throws Exception {
RoundRobinDistributionSchedule schedule = new
RoundRobinDistributionSchedule(3, 2, 5);
- List<Integer> wSet = schedule.getWriteSet(1);
+ DistributionSchedule.WriteSet wSet = schedule.getWriteSet(1);
assertEquals("Write set is wrong size", wSet.size(), 3);
-
DistributionSchedule.AckSet ackSet = schedule.getAckSet();
- assertFalse("Shouldn't ack yet",
ackSet.completeBookieAndCheck(wSet.get(0)));
- assertFalse("Shouldn't ack yet",
ackSet.completeBookieAndCheck(wSet.get(0)));
- assertTrue("Should ack after 2 unique",
ackSet.completeBookieAndCheck(wSet.get(2)));
- assertTrue("Should still be acking",
ackSet.completeBookieAndCheck(wSet.get(1)));
+ assertFalse("Shouldn't ack yet",
+ ackSet.completeBookieAndCheck(wSet.get(0)));
+ assertFalse("Shouldn't ack yet",
+ ackSet.completeBookieAndCheck(wSet.get(0)));
+ assertTrue("Should ack after 2 unique",
+ ackSet.completeBookieAndCheck(wSet.get(2)));
+ assertTrue("Should still be acking",
+ ackSet.completeBookieAndCheck(wSet.get(1)));
}
/**
@@ -129,4 +133,27 @@ public class RoundRobinDistributionScheduleTest {
}
return errors;
}
+
+ @Test
+ public void testMoveAndShift() {
+ DistributionSchedule.WriteSet w = writeSetFromValues(1,2,3,4,5);
+ w.moveAndShift(3, 1);
+ assertEquals(w, writeSetFromValues(1,4,2,3,5));
+
+ w = writeSetFromValues(1,2,3,4,5);
+ w.moveAndShift(1, 3);
+ assertEquals(w, writeSetFromValues(1,3,4,2,5));
+
+ w = writeSetFromValues(1,2,3,4,5);
+ w.moveAndShift(0, 4);
+ assertEquals(w, writeSetFromValues(2,3,4,5,1));
+
+ w = writeSetFromValues(1,2,3,4,5);
+ w.moveAndShift(0, 0);
+ assertEquals(w, writeSetFromValues(1,2,3,4,5));
+
+ w = writeSetFromValues(1,2,3,4,5);
+ w.moveAndShift(4, 4);
+ assertEquals(w, writeSetFromValues(1,2,3,4,5));
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
index 3bcac7d..442fca5 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
@@ -365,8 +365,10 @@ public class TestLedgerChecker extends
BookKeeperClusterTestCase {
}
ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
.getEnsembles().get(0L);
- BookieSocketAddress lastBookieFromEnsemble = firstEnsemble.get(
-
lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed()).get(0));
+ DistributionSchedule.WriteSet writeSet
+ = lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed());
+ BookieSocketAddress lastBookieFromEnsemble
+ = firstEnsemble.get(writeSet.get(0));
LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
+ firstEnsemble);
killBookie(lastBookieFromEnsemble);
@@ -374,8 +376,9 @@ public class TestLedgerChecker extends
BookKeeperClusterTestCase {
lh.addEntry(TEST_LEDGER_ENTRY_DATA);
- lastBookieFromEnsemble = firstEnsemble.get(
-
lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed()).get(1));
+ writeSet = lh.getDistributionSchedule().getWriteSet(
+ lh.getLastAddPushed());
+ lastBookieFromEnsemble = firstEnsemble.get(writeSet.get(1));
LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
+ firstEnsemble);
killBookie(lastBookieFromEnsemble);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index bbf95e9..c79679a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -18,6 +18,8 @@
package org.apache.bookkeeper.client;
import static
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS;
+import static
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.shuffleWithMask;
+import static
org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import java.net.InetAddress;
@@ -53,7 +55,8 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
RackawareEnsemblePlacementPolicy repp;
final ArrayList<BookieSocketAddress> ensemble = new
ArrayList<BookieSocketAddress>();
- final List<Integer> writeSet = new ArrayList<Integer>();
+ DistributionSchedule.WriteSet writeSet
+ = DistributionSchedule.NULL_WRITE_SET;
ClientConfiguration conf = new ClientConfiguration();
BookieSocketAddress addr1, addr2, addr3, addr4;
io.netty.util.HashedWheelTimer timer;
@@ -80,9 +83,7 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
ensemble.add(addr2);
ensemble.add(addr3);
ensemble.add(addr4);
- for (int i = 0; i < 4; i++) {
- writeSet.add(i);
- }
+ writeSet = writeSetFromValues(0,1,2,3);
timer = new HashedWheelTimer(
new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
@@ -125,15 +126,15 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.remove(addr1);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
- List<Integer> expectedSet = new ArrayList<Integer>();
- expectedSet.add(1);
- expectedSet.add(2);
- expectedSet.add(3);
- expectedSet.add(0);
- LOG.info("reorder set : {}", reoderSet);
- assertFalse(reoderSet == writeSet);
- assertEquals(expectedSet, reoderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(),
+ writeSet);
+ DistributionSchedule.WriteSet expectedSet
+ = writeSetFromValues(1, 2, 3, 0);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
}
@Test
@@ -157,15 +158,14 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
ro.add(addr1);
repp.onClusterChanged(addrs, ro);
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
- List<Integer> expectedSet = new ArrayList<Integer>();
- expectedSet.add(1);
- expectedSet.add(2);
- expectedSet.add(3);
- expectedSet.add(0);
- LOG.info("reorder set : {}", reoderSet);
- assertFalse(reoderSet == writeSet);
- assertEquals(expectedSet, reoderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ DistributionSchedule.WriteSet expectedSet
+ = writeSetFromValues(1, 2, 3, 0);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
}
@Test
@@ -188,15 +188,14 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
addrs.remove(addr2);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
- List<Integer> expectedSet = new ArrayList<Integer>();
- expectedSet.add(2);
- expectedSet.add(3);
- expectedSet.add(0);
- expectedSet.add(1);
- LOG.info("reorder set : {}", reoderSet);
- assertFalse(reoderSet == writeSet);
- assertEquals(expectedSet, reoderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ DistributionSchedule.WriteSet expectedSet
+ = writeSetFromValues(2, 3, 0, 1);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
}
@Test
@@ -220,14 +219,13 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
Set<BookieSocketAddress> roAddrs = new HashSet<BookieSocketAddress>();
roAddrs.add(addr2);
repp.onClusterChanged(addrs, roAddrs);
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
- List<Integer> expectedSet = new ArrayList<Integer>();
- expectedSet.add(2);
- expectedSet.add(3);
- expectedSet.add(1);
- expectedSet.add(0);
- assertFalse(reoderSet == writeSet);
- assertEquals(expectedSet, reoderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ DistributionSchedule.WriteSet expectedSet
+ = writeSetFromValues(2, 3, 1, 0);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
}
@Test
@@ -716,7 +714,8 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
bookieFailures.put(addr1, 20L);
bookieFailures.put(addr2, 22L);
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
bookieFailures);
+ DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
+ ensemble, bookieFailures, writeSet);
LOG.info("reorder set : {}", reoderSet);
assertEquals(ensemble.get(reoderSet.get(2)), addr1);
assertEquals(ensemble.get(reoderSet.get(3)), addr2);
@@ -758,4 +757,109 @@ public class TestRackawareEnsemblePlacementPolicy extends
TestCase {
ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4,
null, new HashSet<BookieSocketAddress>());
assertTrue(ensemble.contains(addr4));
}
+
+ @Test
+ public void testShuffleWithMask() {
+ int mask = 0xE1 << 16;
+ int maskBits = 0xFF << 16;
+ boolean shuffleOccurred = false;
+
+ for (int i = 0; i < 100; i++) {
+ DistributionSchedule.WriteSet w = writeSetFromValues(
+ 1, 2, 3 & mask, 4 & mask, 5 & mask, 6);
+ shuffleWithMask(w, mask, maskBits);
+ assertEquals(w.get(0), 1);
+ assertEquals(w.get(1), 2);
+ assertEquals(w.get(5), 6);
+
+ if (w.get(3) == (3 & mask)
+ || w.get(4) == (3 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(2) != (3 & mask)) {
+ fail("3 not found");
+ }
+
+ if (w.get(2) == (4 & mask)
+ || w.get(4) == (4 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(3) != (4 & mask)) {
+ fail("4 not found");
+ }
+
+ if (w.get(2) == (5 & mask)
+ || w.get(3) == (5 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(4) != (5 & mask)) {
+ fail("5 not found");
+ }
+ }
+ assertTrue(shuffleOccurred);
+
+ // at start of array
+ shuffleOccurred = false;
+ for (int i = 0; i < 100; i++) {
+ DistributionSchedule.WriteSet w = writeSetFromValues(
+ 1 & mask, 2 & mask, 3 & mask, 4, 5, 6);
+ shuffleWithMask(w, mask, maskBits);
+ assertEquals(w.get(3), 4);
+ assertEquals(w.get(4), 5);
+ assertEquals(w.get(5), 6);
+
+ if (w.get(1) == (1 & mask)
+ || w.get(2) == (1 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(0) != (1 & mask)) {
+ fail("1 not found");
+ }
+
+ if (w.get(0) == (2 & mask)
+ || w.get(2) == (2 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(1) != (2 & mask)) {
+ fail("2 not found");
+ }
+
+ if (w.get(0) == (3 & mask)
+ || w.get(1) == (3 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(2) != (3 & mask)) {
+ fail("3 not found");
+ }
+ }
+ assertTrue(shuffleOccurred);
+
+ // at end of array
+ shuffleOccurred = false;
+ for (int i = 0; i < 100; i++) {
+ DistributionSchedule.WriteSet w = writeSetFromValues(
+ 1, 2, 3, 4 & mask, 5 & mask, 6 & mask);
+ shuffleWithMask(w, mask, maskBits);
+ assertEquals(w.get(0), 1);
+ assertEquals(w.get(1), 2);
+ assertEquals(w.get(2), 3);
+
+ if (w.get(4) == (4 & mask)
+ || w.get(5) == (4 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(3) != (4 & mask)) {
+ fail("4 not found");
+ }
+
+ if (w.get(3) == (5 & mask)
+ || w.get(5) == (5 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(4) != (5 & mask)) {
+ fail("5 not found");
+ }
+
+ if (w.get(3) == (6 & mask)
+ || w.get(4) == (6 & mask)) {
+ shuffleOccurred = true;
+ } else if (w.get(5) != (6 & mask)) {
+ fail("6 not found");
+ }
+ }
+ assertTrue(shuffleOccurred);
+ }
+
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index da11e14..d2cfcc4 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -19,12 +19,14 @@ package org.apache.bookkeeper.client;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
@@ -47,7 +49,9 @@ import org.slf4j.LoggerFactory;
import junit.framework.TestCase;
+
import static
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.*;
+import static
org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
@@ -57,7 +61,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
RegionAwareEnsemblePlacementPolicy repp;
final ClientConfiguration conf = new ClientConfiguration();
final ArrayList<BookieSocketAddress> ensemble = new
ArrayList<BookieSocketAddress>();
- final List<Integer> writeSet = new ArrayList<Integer>();
+ DistributionSchedule.WriteSet writeSet =
DistributionSchedule.NULL_WRITE_SET;
BookieSocketAddress addr1, addr2, addr3, addr4;
HashedWheelTimer timer;
@@ -89,9 +93,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends
TestCase {
ensemble.add(addr2);
ensemble.add(addr3);
ensemble.add(addr4);
- for (int i = 0; i < 4; i++) {
- writeSet.add(i);
- }
+
+ writeSet = writeSetFromValues(0,1,2,3);
timer = new HashedWheelTimer(
new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
@@ -116,9 +119,10 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
repp = new RegionAwareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
- List<Integer> reorderSet = repp.reorderReadSequence(ensemble,
writeSet, new HashMap<BookieSocketAddress, Long>());
- assertFalse(reorderSet == writeSet);
- assertEquals(writeSet, reorderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ assertEquals(origWriteSet, reorderSet);
}
@Test
@@ -139,14 +143,14 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
- List<Integer> reorderSet = repp.reorderReadSequence(ensemble,
writeSet, new HashMap<BookieSocketAddress, Long>());
- List<Integer> expectedSet = new ArrayList<Integer>();
- expectedSet.add(0);
- expectedSet.add(3);
- expectedSet.add(1);
- expectedSet.add(2);
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(),
writeSet.copy());
+ DistributionSchedule.WriteSet expectedSet = writeSetFromValues(0, 3,
1, 2);
+ LOG.info("write set : {}", writeSet);
LOG.info("reorder set : {}", reorderSet);
- assertFalse(reorderSet == writeSet);
+ LOG.info("expected set : {}", expectedSet);
+ LOG.info("reorder equals {}", reorderSet.equals(writeSet));
+ assertFalse(reorderSet.equals(writeSet));
assertEquals(expectedSet, reorderSet);
}
@@ -158,10 +162,11 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
repp = new RegionAwareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE);
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
- LOG.info("reorder set : {}", reoderSet);
- assertFalse(reoderSet == writeSet);
- assertEquals(writeSet, reoderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ LOG.info("reorder set : {}", reorderSet);
+ assertEquals(origWriteSet, reorderSet);
}
@Test
@@ -182,15 +187,14 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.remove(addr1);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
- List<Integer> expectedSet = new ArrayList<Integer>();
- expectedSet.add(3);
- expectedSet.add(1);
- expectedSet.add(2);
- expectedSet.add(0);
- LOG.info("reorder set : {}", reoderSet);
- assertFalse(reoderSet == writeSet);
- assertEquals(expectedSet, reoderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ DistributionSchedule.WriteSet expectedSet
+ = writeSetFromValues(3, 1, 2, 0);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
}
@Test
@@ -213,15 +217,14 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
ro.add(addr1);
repp.onClusterChanged(addrs, ro);
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
- List<Integer> expectedSet = new ArrayList<Integer>();
- expectedSet.add(3);
- expectedSet.add(1);
- expectedSet.add(2);
- expectedSet.add(0);
- LOG.info("reorder set : {}", reoderSet);
- assertFalse(reoderSet == writeSet);
- assertEquals(expectedSet, reoderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ DistributionSchedule.WriteSet expectedSet
+ = writeSetFromValues(3, 1, 2, 0);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
}
@Test
@@ -243,15 +246,14 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
addrs.remove(addr2);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
new HashMap<BookieSocketAddress, Long>());
- List<Integer> expectedSet = new ArrayList<Integer>();
- expectedSet.add(3);
- expectedSet.add(2);
- expectedSet.add(0);
- expectedSet.add(1);
- LOG.info("reorder set : {}", reoderSet);
- assertFalse(reoderSet == writeSet);
- assertEquals(expectedSet, reoderSet);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(), writeSet);
+ DistributionSchedule.WriteSet expectedSet
+ = writeSetFromValues(3, 2, 0, 1);
+ LOG.info("reorder set : {}", reorderSet);
+ assertFalse(reorderSet.equals(origWriteSet));
+ assertEquals(expectedSet, reorderSet);
}
@Test
@@ -1038,15 +1040,20 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
int ensembleSize = ensemble.size();
for (int i = 0; i < ensembleSize; i++) {
- List<Integer> writeSet = ds.getWriteSet(i);
- List<Integer> readSet;
+ DistributionSchedule.WriteSet writeSet = ds.getWriteSet(i);
+ DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
+ DistributionSchedule.WriteSet readSet;
if (isReadLAC) {
- readSet = repp.reorderReadLACSequence(ensemble, writeSet, new
HashMap<BookieSocketAddress, Long>());
+ readSet = repp.reorderReadLACSequence(
+ ensemble,
+ new HashMap<BookieSocketAddress, Long>(), writeSet);
} else {
- readSet = repp.reorderReadSequence(ensemble, writeSet, new
HashMap<BookieSocketAddress, Long>());
+ readSet = repp.reorderReadSequence(
+ ensemble,
+ new HashMap<BookieSocketAddress, Long>(), writeSet);
}
- LOG.info("Reorder {} => {}.", writeSet, readSet);
+ LOG.info("Reorder {} => {}.", origWriteSet, readSet);
// first few nodes less than REMOTE_NODE_IN_REORDER_SEQUENCE
should be local region
int k = 0;
@@ -1089,13 +1096,19 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
int ensembleSize = ensemble.size();
for (int i = 0; i < ensembleSize; i++) {
- List<Integer> writeSet = ds.getWriteSet(i);
- List<Integer> readSet;
+ DistributionSchedule.WriteSet writeSet = ds.getWriteSet(i);
+ DistributionSchedule.WriteSet readSet;
if (isReadLAC) {
- readSet = repp.reorderReadLACSequence(ensemble, writeSet, new
HashMap<BookieSocketAddress, Long>());
+ readSet = repp.reorderReadLACSequence(
+ ensemble,
+ new HashMap<BookieSocketAddress, Long>(),
+ writeSet.copy());
} else {
- readSet = repp.reorderReadSequence(ensemble, writeSet, new
HashMap<BookieSocketAddress, Long>());
+ readSet = repp.reorderReadSequence(
+ ensemble,
+ new HashMap<BookieSocketAddress, Long>(),
+ writeSet.copy());
}
assertEquals(writeSet, readSet);
@@ -1124,10 +1137,11 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
}
static void appendBookieIndexByRegion(ArrayList<BookieSocketAddress>
ensemble,
- List<Integer> writeSet,
+ DistributionSchedule.WriteSet
writeSet,
String region,
List<Integer> finalSet) {
- for (int bi : writeSet) {
+ for (int i = 0; i < writeSet.size(); i++) {
+ int bi = writeSet.get(i);
String r =
StaticDNSResolver.getRegion(ensemble.get(bi).getHostName());
if (r.equals(region)) {
finalSet.add(bi);
@@ -1159,12 +1173,16 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
int ensembleSize = ensemble.size();
for (int i = 0; i < ensembleSize; i++) {
- List<Integer> writeSet = ds.getWriteSet(i);
- List<Integer> readSet;
+ DistributionSchedule.WriteSet writeSet = ds.getWriteSet(i);
+ DistributionSchedule.WriteSet readSet;
if (isReadLAC) {
- readSet = repp.reorderReadLACSequence(ensemble, writeSet, new
HashMap<BookieSocketAddress, Long>());
+ readSet = repp.reorderReadLACSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(),
+ writeSet.copy());
} else {
- readSet = repp.reorderReadSequence(ensemble, writeSet, new
HashMap<BookieSocketAddress, Long>());
+ readSet = repp.reorderReadSequence(
+ ensemble, new HashMap<BookieSocketAddress, Long>(),
+ writeSet.copy());
}
LOG.info("Reorder {} => {}.", writeSet, readSet);
@@ -1176,8 +1194,10 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
appendBookieIndexByRegion(ensemble, writeSet, readOnlyRegion,
expectedReadSet);
// unavailable bookies
appendBookieIndexByRegion(ensemble, writeSet, unavailableRegion,
expectedReadSet);
-
- assertEquals(expectedReadSet, readSet);
+ assertEquals(expectedReadSet.size(), readSet.size());
+ for (int j = 0; j < expectedReadSet.size(); j++) {
+ assertEquals(expectedReadSet.get(j).intValue(),
readSet.get(j));
+ }
}
}
@@ -1229,9 +1249,8 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
ensemble.add(addr7);
ensemble.add(addr8);
- for (int i = 4; i < 8; i++) {
- writeSet.add(i);
- }
+ DistributionSchedule.WriteSet writeSet2
+ = writeSetFromValues(0,1,2,3,4,5,6,7);
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
@@ -1251,7 +1270,9 @@ public class TestRegionAwareEnsemblePlacementPolicy
extends TestCase {
bookieFailures.put(addr3, 24L);
bookieFailures.put(addr4, 25L);
- List<Integer> reoderSet = repp.reorderReadSequence(ensemble, writeSet,
bookieFailures);
+ LOG.info("write set : {}", writeSet2);
+ DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
+ ensemble, bookieFailures, writeSet2);
LOG.info("reorder set : {}", reoderSet);
assertEquals(ensemble.get(reoderSet.get(0)), addr6);
assertEquals(ensemble.get(reoderSet.get(1)), addr7);
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].