Repository: bookkeeper Updated Branches: refs/heads/master 1e4ccaf16 -> 0583175de
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java new file mode 100644 index 0000000..88c5eb1 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; +import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; +import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.apache.bookkeeper.util.MathUtils; +import org.jboss.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements Runnable { + private final static Logger LOG = LoggerFactory.getLogger(GetBookieInfoProcessorV3.class); + + public GetBookieInfoProcessorV3(Request request, Channel channel, + BookieRequestProcessor requestProcessor) { + super(request, channel, requestProcessor); + } + + private GetBookieInfoResponse getGetBookieInfoResponse() { + long startTimeNanos = MathUtils.nowInNano(); + GetBookieInfoRequest getBookieInfoRequest = request.getGetBookieInfoRequest(); + long requested = getBookieInfoRequest.getRequested(); + + GetBookieInfoResponse.Builder getBookieInfoResponse = GetBookieInfoResponse.newBuilder(); + + if (!isVersionCompatible()) { + getBookieInfoResponse.setStatus(StatusCode.EBADVERSION); + requestProcessor.getBookieInfoStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), + TimeUnit.NANOSECONDS); + return getBookieInfoResponse.build(); + } + + LOG.debug("Received new getBookieInfo request: {}", request); + StatusCode status = StatusCode.EOK; + long freeDiskSpace = 0L, totalDiskSpace = 0L; + if ((requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) { + freeDiskSpace = requestProcessor.bookie.getTotalFreeSpace(); + getBookieInfoResponse.setFreeDiskSpace(freeDiskSpace); + } + if ((requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) { + totalDiskSpace = requestProcessor.bookie.getTotalDiskSpace(); + getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace); + } + LOG.debug("FreeDiskSpace info is " + freeDiskSpace + " totalDiskSpace is: " + totalDiskSpace); + getBookieInfoResponse.setStatus(status); + requestProcessor.getBookieInfoStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), + TimeUnit.NANOSECONDS); + return getBookieInfoResponse.build(); + } + + @Override + public void safeRun() { + GetBookieInfoResponse getBookieInfoResponse = getGetBookieInfoResponse(); + sendResponse(getBookieInfoResponse); + } + + private void sendResponse(GetBookieInfoResponse getBookieInfoResponse) { + Response.Builder response = Response.newBuilder() + .setHeader(getHeader()) + .setStatus(getBookieInfoResponse.getStatus()) + .setGetBookieInfoResponse(getBookieInfoResponse); + sendResponse(response.getStatus(), + response.build(), + requestProcessor.getBookieInfoStats); + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 580a905..f6e9e8f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -32,25 +32,29 @@ import org.apache.bookkeeper.auth.ClientAuthProvider; import com.google.protobuf.ByteString; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperClientStats; +import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; -import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest; -import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType; import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion; -import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest; -import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; @@ -127,6 +131,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final int addEntryTimeout; final int readEntryTimeout; final int maxFrameSize; + final int getBookieInfoTimeout; private final ConcurrentHashMap<CompletionKey, CompletionValue> completionObjects = new ConcurrentHashMap<CompletionKey, CompletionValue>(); @@ -139,6 +144,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan private final OpStatsLogger addTimeoutOpLogger; private final OpStatsLogger writeLacTimeoutOpLogger; private final OpStatsLogger readLacTimeoutOpLogger; + private final OpStatsLogger getBookieInfoOpLogger; + private final OpStatsLogger getBookieInfoTimeoutOpLogger; /** * The following member variables do not need to be concurrent, or volatile @@ -194,6 +201,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan this.requestTimer = requestTimer; this.addEntryTimeout = conf.getAddEntryTimeout(); this.readEntryTimeout = conf.getReadEntryTimeout(); + this.getBookieInfoTimeout = conf.getBookieInfoTimeout(); this.authProviderFactory = authProviderFactory; this.extRegistry = extRegistry; @@ -209,10 +217,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP); writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP); readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP); + getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP); readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ); addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD); writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC); readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC); + getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO); this.pcbcPool = pcbcPool; @@ -674,6 +684,58 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } } + public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) { + final long txnId = getTxnId(); + final CompletionKey completionKey = new CompletionKey(txnId, OperationType.GET_BOOKIE_INFO); + completionObjects.put(completionKey, + new GetBookieInfoCompletion(this, getBookieInfoOpLogger, cb, ctx, + scheduleTimeout(completionKey, getBookieInfoTimeout))); + + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.GET_BOOKIE_INFO) + .setTxnId(txnId); + + GetBookieInfoRequest.Builder getBookieInfoBuilder = GetBookieInfoRequest.newBuilder() + .setRequested(requested); + + final Request getBookieInfoRequest = Request.newBuilder() + .setHeader(headerBuilder) + .setGetBookieInfoRequest(getBookieInfoBuilder) + .build(); + + final Channel c = channel; + if (c == null) { + errorOutReadKey(completionKey); + return; + } + + try{ + ChannelFuture future = c.write(getBookieInfoRequest); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully wrote request {} to {}", + getBookieInfoRequest, c.getRemoteAddress()); + } + } else { + if (!(future.getCause() instanceof ClosedChannelException)) { + LOG.warn("Writing GetBookieInfoRequest(flags={}) to channel {} failed : ", + new Object[] { requested, c, future.getCause() }); + } + errorOutReadKey(completionKey); + } + } + }); + } catch(Throwable e) { + LOG.warn("Get metadata operation {} failed", getBookieInfoRequest, e); + errorOutReadKey(completionKey); + } + } + /** * Disconnects the bookie client. It can be reused. */ @@ -848,6 +910,29 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan }); } + void errorOutGetBookieInfoKey(final CompletionKey key) { + errorOutGetBookieInfoKey(key, BKException.Code.BookieHandleNotAvailableException); + } + + void errorOutGetBookieInfoKey(final CompletionKey key, final int rc) { + final GetBookieInfoCompletion getBookieInfoCompletion = (GetBookieInfoCompletion)completionObjects.remove(key); + if (null == getBookieInfoCompletion) { + return; + } + executor.submit(new SafeRunnable() { + @Override + public void safeRun() { + String bAddress = "null"; + Channel c = channel; + if (c != null) { + bAddress = c.getRemoteAddress().toString(); + } + LOG.debug("Could not write getBookieInfo request for bookie: {}", new Object[] {bAddress}); + getBookieInfoCompletion.cb.getBookieInfoComplete(rc, new BookieInfo(), getBookieInfoCompletion.ctx); + } + }); + } + /** * Errors out pending entries. We call this method from one thread to avoid * concurrent executions to QuorumOpMonitor (implements callbacks). It seems @@ -1009,6 +1094,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan case READ_LAC: handleReadLacResponse(response.getReadLacResponse(), completionValue); break; + case GET_BOOKIE_INFO: + handleGetBookieInfoResponse(response, completionValue); + break; default: LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", type, addr); @@ -1134,6 +1222,33 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx); } + void handleGetBookieInfoResponse(Response response, CompletionValue completionValue) { + // The completion value should always be an instance of a GetBookieInfoCompletion object when we reach here. + GetBookieInfoCompletion rc = (GetBookieInfoCompletion)completionValue; + GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse(); + + long freeDiskSpace = getBookieInfoResponse.hasFreeDiskSpace() ? getBookieInfoResponse.getFreeDiskSpace() : 0L; + long totalDiskCapacity = getBookieInfoResponse.hasTotalDiskCapacity() ? getBookieInfoResponse.getTotalDiskCapacity() : 0L; + + StatusCode status = response.getStatus() == StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Got response for read metadata request from bookie: {} rc {}", addr, rc); + } + + // convert to BKException code because thats what the upper + // layers expect. This is UGLY, there should just be one set of + // error codes. + Integer rcToRet = statusCodeToExceptionCode(status); + if (null == rcToRet) { + LOG.error("Read metadata failed on bookie:{} with code:{}", + new Object[] { addr, status }); + rcToRet = BKException.Code.ReadException; + } + LOG.debug("Response received from bookie info read: freeDiskSpace=" + freeDiskSpace + " totalDiskSpace:" + totalDiskCapacity); + rc.cb.getBookieInfoComplete(rcToRet, new BookieInfo(totalDiskCapacity, freeDiskSpace), rc.ctx); + } + /** * Boiler-plate wrapper classes follow * @@ -1257,6 +1372,42 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } // visible for testing + static class GetBookieInfoCompletion extends CompletionValue { + final GetBookieInfoCallback cb; + + public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, GetBookieInfoCallback cb, Object ctx) { + this(pcbc, null, cb, ctx, null); + } + + public GetBookieInfoCompletion(final PerChannelBookieClient pcbc, final OpStatsLogger getBookieInfoOpLogger, + final GetBookieInfoCallback originalCallback, + final Object originalCtx, final Timeout timeout) { + super(originalCtx, 0L, 0L, timeout); + final long startTime = MathUtils.nowInNano(); + this.cb = (null == getBookieInfoOpLogger) ? originalCallback : new GetBookieInfoCallback() { + @Override + public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) { + cancelTimeout(); + if (getBookieInfoOpLogger != null) { + long latency = MathUtils.elapsedNanos(startTime); + if (rc != BKException.Code.OK) { + getBookieInfoOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS); + } else { + getBookieInfoOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + } + } + + if (rc != BKException.Code.OK && !expectedBkOperationErrors.contains(rc)) { + pcbc.recordError(); + } + + originalCallback.getBookieInfoComplete(rc, bInfo, originalCtx); + } + }; + } + } + + // visible for testing static class AddCompletion extends CompletionValue { final WriteCallback cb; @@ -1355,9 +1506,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } else if (OperationType.WRITE_LAC == operationType) { errorOutWriteLacKey(this, BKException.Code.TimeoutException); writeLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); - } else { + } else if (OperationType.READ_LAC == operationType) { errorOutReadLacKey(this, BKException.Code.TimeoutException); readLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); + } else { + errorOutGetBookieInfoKey(this, BKException.Code.TimeoutException); + getBookieInfoTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); } } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto index 9ce9baf..504e231 100644 --- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto +++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto @@ -59,6 +59,7 @@ enum OperationType { AUTH = 5; WRITE_LAC = 6; READ_LAC = 7; + GET_BOOKIE_INFO = 8; } /** @@ -78,6 +79,7 @@ message Request { optional AuthMessage authRequest = 102; optional WriteLacRequest writeLacRequest = 103; optional ReadLacRequest readLacRequest = 104; + optional GetBookieInfoRequest getBookieInfoRequest = 105; } message ReadRequest { @@ -114,6 +116,15 @@ message ReadLacRequest { required int64 ledgerId = 1; } +message GetBookieInfoRequest { + enum Flags { + TOTAL_DISK_CAPACITY = 0x01; + FREE_DISK_SPACE = 0x02; + } + // bitwise OR of Flags + optional int64 requested = 1; +} + message Response { required BKPacketHeader header = 1; @@ -126,6 +137,7 @@ message Response { optional AuthMessage authResponse = 102; optional WriteLacResponse writeLacResponse = 103; optional ReadLacResponse readLacResponse = 104; + optional GetBookieInfoResponse getBookieInfoResponse = 105; } message ReadResponse { @@ -157,3 +169,9 @@ message ReadLacResponse { optional bytes lacBody = 3; // lac sent by PutLacRequest optional bytes lastEntryBody = 4; // Actual last entry on the disk } + +message GetBookieInfoResponse { + required StatusCode status = 1; + optional int64 totalDiskCapacity = 2; + optional int64 freeDiskSpace = 3; +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java new file mode 100644 index 0000000..2f885c0 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java @@ -0,0 +1,452 @@ +package org.apache.bookkeeper.client; +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.util.MathUtils; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +/** + * Tests of the main BookKeeper client + */ +public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperClusterTestCase { + private final static Logger LOG = LoggerFactory.getLogger(BookKeeperDiskSpaceWeightedLedgerPlacementTest.class); + + public BookKeeperDiskSpaceWeightedLedgerPlacementTest() { + super(10); + } + + private BookieServer restartBookie(ServerConfiguration conf, final long initialFreeDiskSpace, + final long finallFreeDiskSpace, final int delaySecs) throws Exception { + Bookie bookieWithCustomFreeDiskSpace = new Bookie(conf) { + long startTime = System.currentTimeMillis(); + @Override + public long getTotalFreeSpace() { + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } + if (delaySecs == 0 || ((System.currentTimeMillis()) - startTime < delaySecs*1000)) { + return initialFreeDiskSpace; + } else { + // after delaySecs, advertise finallFreeDiskSpace; before that advertise initialFreeDiskSpace + return finallFreeDiskSpace; + } + } + }; + bsConfs.add(conf); + BookieServer server = startBookie(conf, bookieWithCustomFreeDiskSpace); + bs.add(server); + return server; + } + + private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(int bookieIdx, final long freeDiskSpace) + throws Exception { + LOG.info("Killing bookie " + bs.get(bookieIdx).getLocalAddress()); + bs.get(bookieIdx).getLocalAddress(); + ServerConfiguration conf = killBookie(bookieIdx); + return restartBookie(conf, freeDiskSpace, freeDiskSpace, 0); + } + + private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(BookieServer bookie, final long freeDiskSpace) + throws Exception { + for (int i=0; i < bs.size(); i++) { + if (bs.get(i).getLocalAddress().equals(bookie.getLocalAddress())) { + return replaceBookieWithCustomFreeDiskSpaceBookie(i, freeDiskSpace); + } + } + return null; + } + + private BookieServer replaceBookieWithCustomFreeDiskSpaceBookie(int bookieIdx, long initialFreeDiskSpace, + long finalFreeDiskSpace, int delay) throws Exception { + LOG.info("Killing bookie " + bs.get(bookieIdx).getLocalAddress()); + bs.get(bookieIdx).getLocalAddress(); + ServerConfiguration conf = killBookie(bookieIdx); + return restartBookie(conf, initialFreeDiskSpace, finalFreeDiskSpace, delay); + } + + /** + * Test to show that weight based selection honors the disk weight of bookies + */ + @Test(timeout=60000) + public void testDiskSpaceWeightedBookieSelection() throws Exception { + long freeDiskSpace=1000000L; + int multiple=3; + for (int i=0; i < numBookies; i++) { + // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 3MB + if (i < numBookies-2) { + replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace); + } else { + replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace); + } + } + Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>(); + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + + // wait a 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client + ClientConfiguration conf = new ClientConfiguration() + .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true). + setBookieMaxWeightMultipleForWeightBasedPlacement(multiple); + Thread.sleep(200); + final BookKeeper client = new BookKeeper(conf); + Thread.sleep(200); + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + client.close(); + // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median; + // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X + for (int i=0; i < numBookies-2; i++) { + double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1); + double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1); + } + } + + /** + * Test to show that weight based selection honors the disk weight of bookies and also adapts + * when the bookies's weight changes. + */ + @Test(timeout=60000) + public void testDiskSpaceWeightedBookieSelectionWithChangingWeights() throws Exception { + long freeDiskSpace=1000000L; + int multiple=3; + for (int i=0; i < numBookies; i++) { + // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 3MB + if (i < numBookies-2) { + replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace); + } else { + replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace); + } + } + Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>(); + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + + // wait a 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client + ClientConfiguration conf = new ClientConfiguration() + .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true). + setBookieMaxWeightMultipleForWeightBasedPlacement(multiple); + Thread.sleep(100); + final BookKeeper client = new BookKeeper(conf); + Thread.sleep(100); + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + + // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median; + // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X + for (int i=0; i < numBookies-2; i++) { + double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1); + double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1); + } + + // Restart the bookies in such a way that the first 2 bookies go from 1MB to 3MB free space and the last + // 2 bookies go from 3MB to 1MB + BookieServer server1 = bs.get(0); + BookieServer server2 = bs.get(1); + BookieServer server3 = bs.get(numBookies-2); + BookieServer server4 = bs.get(numBookies-1); + + server1 = replaceBookieWithCustomFreeDiskSpaceBookie(server1, multiple*freeDiskSpace); + server2 = replaceBookieWithCustomFreeDiskSpaceBookie(server2, multiple*freeDiskSpace); + server3 = replaceBookieWithCustomFreeDiskSpaceBookie(server3, freeDiskSpace); + server4 = replaceBookieWithCustomFreeDiskSpaceBookie(server4, freeDiskSpace); + + Thread.sleep(100); + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + + // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median; + // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X + for (int i=0; i < numBookies; i++) { + if (server1.getLocalAddress().equals(bs.get(i).getLocalAddress()) || + server2.getLocalAddress().equals(bs.get(i).getLocalAddress())) { + continue; + } + double ratio1 = (double)m.get(server1.getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1); + double ratio2 = (double)m.get(server2.getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1); + } + client.close(); + } + + /** + * Test to show that weight based selection honors the disk weight of bookies and also adapts + * when bookies go away permanently. + */ + @Test(timeout=60000) + public void testDiskSpaceWeightedBookieSelectionWithBookiesDying() throws Exception { + long freeDiskSpace=1000000L; + int multiple=3; + for (int i=0; i < numBookies; i++) { + // the first 8 bookies have freeDiskSpace of 1MB; While the remaining 2 have 1GB + if (i < numBookies-2) { + replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace); + } else { + replaceBookieWithCustomFreeDiskSpaceBookie(0, multiple*freeDiskSpace); + } + } + Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>(); + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + + // wait a couple of 100 msecs each for the bookies to come up and the bookieInfo to be retrieved by the client + ClientConfiguration conf = new ClientConfiguration() + .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true). + setBookieMaxWeightMultipleForWeightBasedPlacement(multiple); + Thread.sleep(100); + final BookKeeper client = new BookKeeper(conf); + Thread.sleep(100); + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + + // make sure that bookies with higher weight are chosen 3X as often as the median; + // since the number of ledgers is small (2000), there may be variation + double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(0).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1); + double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(1).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1); + + // Bring down the 2 bookies that had higher weight; after this the allocation to all + // the remaining bookies should be uniform + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + BookieServer server1 = bs.get(numBookies-2); + BookieServer server2 = bs.get(numBookies-1); + killBookie(numBookies-1); + killBookie(numBookies-2); + + // give some time for the cluster to become stable + Thread.sleep(100); + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + + // make sure that bookies with higher weight are chosen 3X as often as the median; + for (int i=0; i < numBookies-3; i++) { + double delta = Math.abs((double)m.get(bs.get(i).getLocalAddress())-(double)m.get(bs.get(i+1).getLocalAddress())); + delta = (delta*100)/(double)m.get(bs.get(i+1).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + delta, delta <= 30); // the deviation should be less than 30% + } + // since the following 2 bookies were down, they shouldn't ever be selected + assertTrue("Weigheted placement is not honored" + m.get(server1.getLocalAddress()), + m.get(server1.getLocalAddress()) == 0); + assertTrue("Weigheted placement is not honored" + m.get(server2.getLocalAddress()), + m.get(server2.getLocalAddress()) == 0); + + client.close(); + } + + /** + * Test to show that weight based selection honors the disk weight of bookies and also adapts + * when bookies are added. + */ + @Test(timeout=60000) + public void testDiskSpaceWeightedBookieSelectionWithBookiesBeingAdded() throws Exception { + long freeDiskSpace=1000000L; + int multiple=3; + for (int i=0; i < numBookies; i++) { + // all the bookies have freeDiskSpace of 1MB + replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace); + } + // let the last two bookies be down initially + ServerConfiguration conf1 = killBookie(numBookies-1); + ServerConfiguration conf2 = killBookie(numBookies-2); + Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>(); + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + + // wait a bit for the bookies to come up and the bookieInfo to be retrieved by the client + ClientConfiguration conf = new ClientConfiguration() + .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true). + setBookieMaxWeightMultipleForWeightBasedPlacement(multiple); + Thread.sleep(100); + final BookKeeper client = new BookKeeper(conf); + Thread.sleep(100); + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + + // make sure that bookies with higher weight are chosen 3X as often as the median; + // since the number of ledgers is small (2000), there may be variation + for (int i=0; i < numBookies-3; i++) { + double delta = Math.abs((double)m.get(bs.get(i).getLocalAddress())-(double)m.get(bs.get(i+1).getLocalAddress())); + delta = (delta*100)/(double)m.get(bs.get(i+1).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + delta, delta <= 30); // the deviation should be less than 30% + } + + // bring up the two dead bookies; they'll also have 3X more free space than the rest of the bookies + restartBookie(conf1, multiple*freeDiskSpace, multiple*freeDiskSpace, 0); + restartBookie(conf2, multiple*freeDiskSpace, multiple*freeDiskSpace, 0); + + // give some time for the cluster to become stable + Thread.sleep(100); + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + + // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median; + // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X + for (int i=0; i < numBookies-2; i++) { + double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1); + double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1); + } + client.close(); + } + + /** + * Tests that the bookie selection is based on the amount of free disk space a bookie has. Also make sure that + * the periodic bookieInfo read is working and causes the new weights to be taken into account. + */ + @Test(timeout=60000) + public void testDiskSpaceWeightedBookieSelectionWithPeriodicBookieInfoUpdate() throws Exception { + long freeDiskSpace=1000000L; + int multiple=3; + for (int i=0; i < numBookies; i++) { + // the first 8 bookies have freeDiskSpace of 1MB; the remaining 2 will advertise 1MB for + // the first 3 seconds but then they'll advertise 3MB after the first 3 seconds + if (i < numBookies-2) { + replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace); + } else { + replaceBookieWithCustomFreeDiskSpaceBookie(0, freeDiskSpace, multiple*freeDiskSpace, 2); + } + } + Map<BookieSocketAddress, Integer> m = new HashMap<BookieSocketAddress, Integer>(); + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + + // the periodic bookieInfo is read once every 7 seconds + int updateIntervalSecs = 6; + ClientConfiguration conf = new ClientConfiguration() + .setZkServers(zkUtil.getZooKeeperConnectString()).setDiskWeightBasedPlacementEnabled(true). + setBookieMaxWeightMultipleForWeightBasedPlacement(multiple). + setGetBookieInfoIntervalSeconds(updateIntervalSecs, TimeUnit.SECONDS); + // wait a bit for the bookies to come up and the bookieInfo to be retrieved by the client + Thread.sleep(100); + final BookKeeper client = new BookKeeper(conf); + Thread.sleep(100); + long startMsecs = MathUtils.now(); + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + long elapsedMsecs = MathUtils.now() - startMsecs; + + // make sure that all the bookies are chosen pretty much uniformly + int bookiesToCheck = numBookies-1; + if (elapsedMsecs > updateIntervalSecs*1000) { + // if this task longer than updateIntervalSecs, the weight for the last 2 bookies will be + // higher, so skip checking them + bookiesToCheck = numBookies-3; + } + for (int i=0; i < bookiesToCheck; i++) { + double delta = Math.abs((double)m.get(bs.get(i).getLocalAddress())-(double)m.get(bs.get(i+1).getLocalAddress())); + delta = (delta*100)/(double)m.get(bs.get(i+1).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + delta, delta <= 30); // the deviation should be <30% + } + + if (elapsedMsecs < updateIntervalSecs*1000) { + // sleep until periodic bookie info retrieval kicks in and it gets the updated + // freeDiskSpace for the last 2 bookies + Thread.sleep(updateIntervalSecs*1000 - elapsedMsecs); + } + + for (BookieServer b : bs) { + m.put(b.getLocalAddress(), 0); + } + for (int i = 0; i < 2000; i++) { + LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes()); + for (BookieSocketAddress b : lh.getLedgerMetadata().getEnsemble(0)) { + m.put(b, m.get(b)+1); + } + } + + // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median; + // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X + for (int i=0; i < numBookies-2; i++) { + double ratio1 = (double)m.get(bs.get(numBookies-2).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1-multiple), Math.abs(ratio1-multiple) < 1); + double ratio2 = (double)m.get(bs.get(numBookies-1).getLocalAddress())/(double)m.get(bs.get(i).getLocalAddress()); + assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2-multiple), Math.abs(ratio2-multiple) < 1); + } + + client.close(); + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java new file mode 100644 index 0000000..ed41cb2 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java @@ -0,0 +1,141 @@ +package org.apache.bookkeeper.client; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; + +import org.apache.bookkeeper.client.BKException.Code; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookieClient; +import org.apache.bookkeeper.proto.BookkeeperProtocol; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.util.OrderedSafeExecutor; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This unit test tests timeout of GetBookieInfo request; + * + */ +public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { + private final static Logger LOG = LoggerFactory.getLogger(TestGetBookieInfoTimeout.class); + DigestType digestType; + public ClientSocketChannelFactory channelFactory; + public OrderedSafeExecutor executor; + + public TestGetBookieInfoTimeout() { + super(10); + this.digestType = DigestType.CRC32; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors + .newCachedThreadPool()); + executor = OrderedSafeExecutor.newBuilder() + .name("BKClientOrderedSafeExecutor") + .numThreads(2) + .build(); + } + + @After + public void tearDown() throws Exception { + channelFactory.releaseExternalResources(); + executor.shutdown(); + } + + @Test(timeout=60000) + public void testGetBookieInfoTimeout() throws Exception { + + // connect to the bookies and create a ledger + LedgerHandle writelh = bkc.createLedger(3,3,digestType, "testPasswd".getBytes()); + String tmp = "Foobar"; + final int numEntries = 10; + for (int i = 0; i < numEntries; i++) { + writelh.addEntry(tmp.getBytes()); + } + + // set timeout for getBookieInfo to be 2 secs and cause one of the bookies to go to sleep for 3X that time + ClientConfiguration cConf = new ClientConfiguration(); + cConf.setGetBookieInfoTimeout(2); + + final BookieSocketAddress bookieToSleep = writelh.getLedgerMetadata().getEnsemble(0).get(0); + int sleeptime = cConf.getBookieInfoTimeout()*3; + CountDownLatch latch = sleepBookie(bookieToSleep, sleeptime); + latch.await(); + + // try to get bookie info from the sleeping bookie. It should fail with timeout error + BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(), + bookieToSleep.getPort()); + BookieClient bc = new BookieClient(cConf, channelFactory, executor); + long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE | + BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE; + + class CallbackObj { + int rc; + long requested; + @SuppressWarnings("unused") + long freeDiskSpace, totalDiskCapacity; + CountDownLatch latch = new CountDownLatch(1); + CallbackObj(long requested) { + this.requested = requested; + this.rc = 0; + this.freeDiskSpace = 0L; + this.totalDiskCapacity = 0L; + } + }; + CallbackObj obj = new CallbackObj(flags); + bc.getBookieInfo(addr, flags, new GetBookieInfoCallback() { + @Override + public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) { + CallbackObj obj = (CallbackObj)ctx; + obj.rc=rc; + if (rc == Code.OK) { + if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) { + obj.freeDiskSpace = bInfo.getFreeDiskSpace(); + } + if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) { + obj.totalDiskCapacity = bInfo.getTotalDiskSpace(); + } + } + obj.latch.countDown(); + } + + }, obj); + obj.latch.await(); + LOG.debug("Return code: " + obj.rc); + assertTrue("GetBookieInfo failed with unexpected error code: " + obj.rc, obj.rc == Code.TimeoutException); + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index bef6bc2..6739ea4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -33,6 +34,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import junit.framework.TestCase; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; +import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNSToSwitchMapping; @@ -420,6 +422,252 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase { repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); } + @Test(timeout = 60000) + public void testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), + NetworkTopology.DEFAULT_RACK); + StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + // Update cluster + Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>(); + addrs.add(addr1); + addrs.add(addr2); + addrs.add(addr3); + addrs.add(addr4); + + int multiple = 10; + conf.setDiskWeightBasedPlacementEnabled(true); + conf.setBookieMaxWeightMultipleForWeightBasedPlacement(-1); // no max cap on weight + repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer, DISABLE_ALL, null); + + repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); + Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>(); + bookieInfoMap.put(addr1, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr2, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr3, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr4, new BookieInfo(multiple*100L, multiple*100L)); + repp.updateBookieInfo(bookieInfoMap); + + Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>(); + selectionCounts.put(addr3, 0L); + selectionCounts.put(addr4, 0L); + int numTries = 50000; + BookieSocketAddress replacedBookie; + for (int i = 0; i < numTries; i++) { + // replace node under r2 + replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>()); + assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie)); + selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie)+1); + } + double observedMultiple = ((double)selectionCounts.get(addr4)/(double)selectionCounts.get(addr3)); + assertTrue("Weights not being honored " + observedMultiple, Math.abs(observedMultiple-multiple) < 1); + } + + @Test(timeout = 60000) + public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + // update dns mapping + StaticDNSResolver.reset(); + StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_RACK); + StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r4"); + // Update cluster + Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>(); + addrs.add(addr1); + addrs.add(addr2); + addrs.add(addr3); + addrs.add(addr4); + + int multiple = 10, maxMultiple = 4; + conf.setDiskWeightBasedPlacementEnabled(true); + conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple); + repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer, DISABLE_ALL, null); + + repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); + Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>(); + bookieInfoMap.put(addr1, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr2, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr3, new BookieInfo(200L, 200L)); + bookieInfoMap.put(addr4, new BookieInfo(multiple*100L, multiple*100L)); + repp.updateBookieInfo(bookieInfoMap); + + Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>(); + selectionCounts.put(addr1, 0L); + selectionCounts.put(addr2, 0L); + selectionCounts.put(addr3, 0L); + selectionCounts.put(addr4, 0L); + int numTries = 50000; + BookieSocketAddress replacedBookie; + for (int i = 0; i < numTries; i++) { + // addr2 is on /r2 and this is the only one on this rack. So the replacement + // will come from other racks. However, the weight should be honored in such + // selections as well + replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>()); + assertTrue(addr1.equals(replacedBookie) || addr3.equals(replacedBookie) || addr4.equals(replacedBookie)); + selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie)+1); + } + + double medianWeight = 150; + double medianSelectionCounts = (double)(medianWeight/bookieInfoMap.get(addr1).getWeight())*selectionCounts.get(addr1); + double observedMultiple1 = ((double)selectionCounts.get(addr4)/(double)medianSelectionCounts); + double observedMultiple2 = ((double)selectionCounts.get(addr4)/(double)selectionCounts.get(addr3)); + LOG.info("oM1 " + observedMultiple1 + " oM2 " + observedMultiple2); + assertTrue("Weights not being honored expected " + maxMultiple + " observed " + observedMultiple1, + Math.abs(observedMultiple1-maxMultiple) < 1); + double expected = (medianWeight*maxMultiple)/bookieInfoMap.get(addr3).getWeight();// expected multiple for addr3 + assertTrue("Weights not being honored expected " + expected + " observed " + observedMultiple2, + Math.abs(observedMultiple2-expected) < 1); + } + + @Test(timeout = 60000) + public void testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181); + BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181); + BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181); + BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181); + + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_RACK); + StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr7.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr8.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr9.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3"); + + // Update cluster + Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>(); + addrs.add(addr1); + addrs.add(addr2); + addrs.add(addr3); + addrs.add(addr4); + addrs.add(addr5); + addrs.add(addr6); + addrs.add(addr7); + addrs.add(addr8); + addrs.add(addr9); + + int maxMultiple = 4; + conf.setDiskWeightBasedPlacementEnabled(true); + conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple); + repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer, DISABLE_ALL, null); + + repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); + Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>(); + bookieInfoMap.put(addr1, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr2, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr3, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr4, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L)); + bookieInfoMap.put(addr6, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr7, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr8, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr9, new BookieInfo(1000L, 1000L)); + + repp.updateBookieInfo(bookieInfoMap); + + Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>(); + for (BookieSocketAddress b : addrs) { + selectionCounts.put(b, 0L); + } + int numTries = 10000; + + Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>(); + ArrayList<BookieSocketAddress> ensemble; + for (int i = 0; i < numTries; i++) { + // addr2 is on /r2 and this is the only one on this rack. So the replacement + // will come from other racks. However, the weight should be honored in such + // selections as well + ensemble = repp.newEnsemble(3, 2, 2, null, excludeList); + assertTrue("Rackaware selection not happening " + getNumCoveredWriteQuorums(ensemble, 2), getNumCoveredWriteQuorums(ensemble, 2) >= 2); + for (BookieSocketAddress b : ensemble) { + selectionCounts.put(b, selectionCounts.get(b)+1); + } + } + + // the median weight used is 100 since addr2 and addr6 have the same weight, we use their + // selection counts as the same as median + double observedMultiple1 = ((double)selectionCounts.get(addr5)/(double)selectionCounts.get(addr2)); + double observedMultiple2 = ((double)selectionCounts.get(addr9)/(double)selectionCounts.get(addr6)); + assertTrue("Weights not being honored expected 2 observed " + observedMultiple1, + Math.abs(observedMultiple1-maxMultiple) < 0.5); + assertTrue("Weights not being honored expected 4 observed " + observedMultiple2, + Math.abs(observedMultiple2-maxMultiple) < 0.5); + } + + @Test(timeout = 60000) + public void testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_RACK); + StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r2"); + StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3"); + StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), NetworkTopology.DEFAULT_REGION + "/r3"); + // Update cluster + Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>(); + addrs.add(addr1); + addrs.add(addr2); + addrs.add(addr3); + addrs.add(addr4); + addrs.add(addr5); + + int maxMultiple = 4; + conf.setDiskWeightBasedPlacementEnabled(true); + conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple); + repp.initialize(conf, Optional.<DNSToSwitchMapping>absent(), timer, DISABLE_ALL, null); + + repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>()); + Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>(); + bookieInfoMap.put(addr1, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr2, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr3, new BookieInfo(1000L, 1000L)); + bookieInfoMap.put(addr4, new BookieInfo(100L, 100L)); + bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L)); + + repp.updateBookieInfo(bookieInfoMap); + + ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>(); + Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>(); + try { + excludeList.add(addr1); + excludeList.add(addr2); + excludeList.add(addr3); + excludeList.add(addr4); + ensemble = repp.newEnsemble(3, 2, 2, null, excludeList); + fail("Should throw BKNotEnoughBookiesException when there is not enough bookies" + ensemble); + } catch (BKNotEnoughBookiesException e) { + // this is expected + } + try { + ensemble = repp.newEnsemble(1, 1, 1, null, excludeList); + } catch (BKNotEnoughBookiesException e) { + fail("Should not throw BKNotEnoughBookiesException when there are enough bookies for the ensemble"); + } + } + private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize) throws Exception { int ensembleSize = ensemble.size(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java new file mode 100644 index 0000000..037ac80 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWeightedRandomSelection.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.client; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject; +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.Configuration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestWeightedRandomSelection { + + static final Logger LOG = LoggerFactory.getLogger(TestWeightedRandomSelection.class); + + static class TestObj implements WeightedObject { + long val; + TestObj(long value) { + this.val = value; + } + @Override + public long getWeight() { + return val; + } + } + + WeightedRandomSelection<String> wRS; + Configuration conf = new CompositeConfiguration(); + int multiplier = 3; + + @Before + public void setUp() throws Exception { + wRS = new WeightedRandomSelection<String>(); + } + + @After + public void tearDown() throws Exception { + } + + @Test(timeout = 60000) + public void testSelectionWithEqualWeights() throws Exception { + Map<String, WeightedObject> map = new HashMap<String, WeightedObject>(); + + Long val=100L; + int numKeys = 50, totalTries = 1000000; + Map<String, Integer> randomSelection = new HashMap<String, Integer>(); + for (Integer i=0; i < numKeys; i++) { + map.put(i.toString(), new TestObj(val)); + randomSelection.put(i.toString(), 0); + } + + wRS.updateMap(map); + for (int i = 0; i < totalTries; i++) { + String key = wRS.getNextRandom(); + randomSelection.put(key, randomSelection.get(key)+1); + } + + // there should be uniform distribution + double expectedPct = ((double)1/(double)numKeys)*100; + for (Map.Entry<String, Integer> e : randomSelection.entrySet()) { + double actualPct = ((double)e.getValue()/(double)totalTries)*100; + double delta = (Math.abs(expectedPct-actualPct)/expectedPct)*100; + System.out.println("Key:" + e.getKey() + " Value:" + e.getValue() + " Expected: " + expectedPct + " Actual: " + actualPct); + // should be within 5% of expected + assertTrue("Not doing uniform selection when weights are equal", delta < 5); + } + } + + @Test(timeout = 60000) + public void testSelectionWithAllZeroWeights() throws Exception { + Map<String, WeightedObject> map = new HashMap<String, WeightedObject>(); + + int numKeys = 50, totalTries = 1000000; + Map<String, Integer> randomSelection = new HashMap<String, Integer>(); + for (Integer i=0; i < numKeys; i++) { + map.put(i.toString(), new TestObj(0L)); + randomSelection.put(i.toString(), 0); + } + + wRS.updateMap(map); + for (int i = 0; i < totalTries; i++) { + String key = wRS.getNextRandom(); + randomSelection.put(key, randomSelection.get(key)+1); + } + + // when all the values are zeros, there should be uniform distribution + double expectedPct = ((double)1/(double)numKeys)*100; + for (Map.Entry<String, Integer> e : randomSelection.entrySet()) { + double actualPct = ((double)e.getValue()/(double)totalTries)*100; + double delta = (Math.abs(expectedPct-actualPct)/expectedPct)*100; + System.out.println("Key:" + e.getKey() + " Value:" + e.getValue() + " Expected: " + expectedPct + " Actual: " + actualPct); + // should be within 5% of expected + assertTrue("Not doing uniform selection when weights are equal", delta < 5); + } + } + + void verifyResult(Map<String, WeightedObject> map, Map<String, Integer> randomSelection, int multiplier, + long minWeight, long medianWeight, long totalWeight, int totalTries) { + List<Integer> values = new ArrayList<Integer>(randomSelection.values()); + Collections.sort(values); + double medianObserved, medianObservedWeight, medianExpectedWeight; + int mid = values.size()/2; + if ((values.size() % 2) == 1) { + medianObserved = values.get(mid); + } else { + medianObserved = (double)(values.get(mid-1) + values.get(mid))/2; + } + + medianObservedWeight = (double)medianObserved/(double)totalTries; + medianExpectedWeight = (double)medianWeight/totalWeight; + + for (Map.Entry<String, Integer> e : randomSelection.entrySet()) { + double observed = (((double)e.getValue()/(double)totalTries)); + + double expected; + if (map.get(e.getKey()).getWeight() == 0) { + // if the value is 0 for any key, we make it equal to the first non zero value + expected = (double)minWeight/(double)totalWeight; + } else { + expected = (double)map.get(e.getKey()).getWeight()/(double)totalWeight; + } + if (multiplier > 0 && expected > multiplier*medianExpectedWeight) { + expected = multiplier*medianExpectedWeight; + } + // We can't compare these weights because they are derived from different + // values. But if we express them as a multiple of the min in each, then + // they should be comparable + double expectedMultiple = expected/medianExpectedWeight; + double observedMultiple = observed/medianObservedWeight; + double delta = (Math.abs(expectedMultiple-observedMultiple)/expectedMultiple)*100; + System.out.println("Key:" + e.getKey() + " Value:" + e.getValue() + + " Expected " + expectedMultiple + " actual " + observedMultiple + " delta " + delta + "%"); + + // the observed should be within 5% of expected + assertTrue("Not doing uniform selection when weights are equal", delta < 5); + } + } + + @Test(timeout = 60000) + public void testSelectionWithSomeZeroWeights() throws Exception { + Map<String, WeightedObject> map = new HashMap<String, WeightedObject>(); + Map<String, Integer> randomSelection = new HashMap<String, Integer>(); + int numKeys = 50; + multiplier=3; + long val=0L, total=0L, minWeight = 100L, medianWeight=minWeight; + wRS.setMaxProbabilityMultiplier(multiplier); + for (Integer i=0; i < numKeys; i++) { + if (i < numKeys/3) { + val = 0L; + } else if (i < 2*(numKeys/3)){ + val = minWeight; + } else { + val = 2*minWeight; + } + total += val; + map.put(i.toString(), new TestObj(val)); + randomSelection.put(i.toString(), 0); + } + + wRS.updateMap(map); + int totalTries = 10000000; + for (int i = 0; i < totalTries; i++) { + String key = wRS.getNextRandom(); + randomSelection.put(key, randomSelection.get(key)+1); + } + verifyResult(map, randomSelection, multiplier, minWeight, medianWeight, total, totalTries); + } + + @Test(timeout = 60000) + public void testSelectionWithUnequalWeights() throws Exception { + Map<String, WeightedObject> map = new HashMap<String, WeightedObject>(); + Map<String, Integer> randomSelection = new HashMap<String, Integer>(); + int numKeys = 50; + multiplier=4; + long val=0L, total=0L, minWeight=100L, medianWeight=2*minWeight; + wRS.setMaxProbabilityMultiplier(multiplier); + for (Integer i=0; i < numKeys; i++) { + if (i < numKeys/3) { + val = minWeight; + } else if (i < 2*(numKeys/3)){ + val = 2*minWeight; + } else { + val = 10*minWeight; + } + total += val; + map.put(i.toString(), new TestObj(val)); + randomSelection.put(i.toString(), 0); + } + + wRS.updateMap(map); + int totalTries = 10000000; + for (int i = 0; i < totalTries; i++) { + String key = wRS.getNextRandom(); + randomSelection.put(key, randomSelection.get(key)+1); + } + verifyResult(map, randomSelection, multiplier, minWeight, medianWeight, total, totalTries); + } + + @Test(timeout = 60000) + public void testSelectionWithHotNode() throws Exception { + Map<String, WeightedObject> map = new HashMap<String, WeightedObject>(); + Map<String, Integer> randomSelection = new HashMap<String, Integer>(); + + multiplier=3; // no max + int numKeys = 50; + long total=0L, minWeight = 100L, val = minWeight, medianWeight=minWeight; + wRS.setMaxProbabilityMultiplier(multiplier); + for (Integer i=0; i < numKeys; i++) { + if (i == numKeys-1) { + // last one has 10X more weight than the rest put together + val=10*(numKeys-1)*100L; + } + total += val; + map.put(i.toString(), new TestObj(val)); + randomSelection.put(i.toString(), 0); + } + + wRS.updateMap(map); + int totalTries = 10000000; + for (int i = 0; i < totalTries; i++) { + String key = wRS.getNextRandom(); + randomSelection.put(key, randomSelection.get(key)+1); + } + verifyResult(map, randomSelection, multiplier, minWeight, medianWeight, total, totalTries); + } + + @Test(timeout = 60000) + public void testSelectionWithHotNodeWithLimit() throws Exception { + Map<String, WeightedObject> map = new HashMap<String, WeightedObject>(); + Map<String, Integer> randomSelection = new HashMap<String, Integer>(); + + multiplier=3; // limit the max load on hot node to be 3X + int numKeys = 50; + long total=0L, minWeight = 100L, val = minWeight, medianWeight=minWeight; + wRS.setMaxProbabilityMultiplier(multiplier); + for (Integer i=0; i < numKeys; i++) { + if (i == numKeys-1) { + // last one has 10X more weight than the rest put together + val=10*(numKeys-1)*100L; + } + total += val; + map.put(i.toString(), new TestObj(val)); + randomSelection.put(i.toString(), 0); + } + + wRS.updateMap(map); + int totalTries = 10000000; + for (int i = 0; i < totalTries; i++) { + String key = wRS.getNextRandom(); + randomSelection.put(key, randomSelection.get(key)+1); + } + verifyResult(map, randomSelection, multiplier, minWeight, medianWeight, total, totalTries); + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 7760827..0698780 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -22,11 +22,15 @@ package org.apache.bookkeeper.test; */ import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BKException.Code; +import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -34,6 +38,8 @@ import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; +import org.apache.bookkeeper.proto.BookkeeperProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.util.OrderedSafeExecutor; @@ -250,4 +256,48 @@ public class BookieClientTest { assertEquals(BKException.Code.NoSuchLedgerExistsException, arc.rc); } } + + @Test(timeout=60000) + public void testGetBookieInfo() throws IOException, InterruptedException { + BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port); + BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor); + long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE | + BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE; + + class CallbackObj { + int rc; + long requested; + long freeDiskSpace, totalDiskCapacity; + CountDownLatch latch = new CountDownLatch(1); + CallbackObj(long requested) { + this.requested = requested; + this.rc = 0; + this.freeDiskSpace = 0L; + this.totalDiskCapacity = 0L; + } + }; + CallbackObj obj = new CallbackObj(flags); + bc.getBookieInfo(addr, flags, new GetBookieInfoCallback() { + @Override + public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) { + CallbackObj obj = (CallbackObj)ctx; + obj.rc=rc; + if (rc == Code.OK) { + if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) { + obj.freeDiskSpace = bInfo.getFreeDiskSpace(); + } + if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) { + obj.totalDiskCapacity = bInfo.getTotalDiskSpace(); + } + } + obj.latch.countDown(); + } + + }, obj); + obj.latch.await(); + System.out.println("Return code: " + obj.rc + "FreeDiskSpace: " + obj.freeDiskSpace + " TotalCapacity: " + obj.totalDiskCapacity); + assertTrue("GetBookieInfo failed with error " + obj.rc, obj.rc == Code.OK); + assertTrue("GetBookieInfo failed with error " + obj.rc, obj.freeDiskSpace <= obj.totalDiskCapacity); + assertTrue("GetBookieInfo failed with error " + obj.rc, obj.totalDiskCapacity > 0); + } }
