Repository: bookkeeper Updated Branches: refs/heads/master 42e8f1294 -> c813b3d32
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index a4fb761..3fb73e4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.auth.ClientAuthProvider; +import com.google.protobuf.ByteString; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperClientStats; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -39,13 +40,22 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType; import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion; +import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -124,7 +134,11 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan private final OpStatsLogger readEntryOpLogger; private final OpStatsLogger readTimeoutOpLogger; private final OpStatsLogger addEntryOpLogger; + private final OpStatsLogger writeLacOpLogger; + private final OpStatsLogger readLacOpLogger; private final OpStatsLogger addTimeoutOpLogger; + private final OpStatsLogger writeLacTimeoutOpLogger; + private final OpStatsLogger readLacTimeoutOpLogger; /** * The following member variables do not need to be concurrent, or volatile @@ -192,8 +206,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP); addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP); + writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP); + readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP); readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ); addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD); + writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC); + readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC); this.pcbcPool = pcbcPool; @@ -238,6 +256,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } private void completeOperation(GenericCallback<PerChannelBookieClient> op, int rc) { + //Thread.dumpStack(); closeLock.readLock().lock(); try { if (ConnectionState.CLOSED == state) { @@ -365,6 +384,60 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } + void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ChannelBuffer toSend, WriteLacCallback cb, Object ctx) { + final long txnId = getTxnId(); + final int entrySize = toSend.readableBytes(); + final CompletionKey completionKey = new CompletionKey(txnId, OperationType.WRITE_LAC); + // writeLac is mostly like addEntry hence uses addEntryTimeout + completionObjects.put(completionKey, + new WriteLacCompletion(writeLacOpLogger, cb, ctx, lac, scheduleTimeout(completionKey, addEntryTimeout))); + + // Build the request + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.WRITE_LAC) + .setTxnId(txnId); + WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder() + .setLedgerId(ledgerId) + .setLac(lac) + .setMasterKey(ByteString.copyFrom(masterKey)) + .setBody(ByteString.copyFrom(toSend.toByteBuffer())); + + final Request writeLacRequest = Request.newBuilder() + .setHeader(headerBuilder) + .setWriteLacRequest(writeLacBuilder) + .build(); + + final Channel c = channel; + if (c == null) { + errorOutWriteLacKey(completionKey); + return; + } + try { + ChannelFuture future = c.write(writeLacRequest); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully wrote request for writeLac LedgerId: {} bookie: {}", + ledgerId, c.getRemoteAddress()); + } + } else { + if (!(future.getCause() instanceof ClosedChannelException)) { + LOG.warn("Writing Lac(lid={} to channel {} failed : ", + new Object[] { ledgerId, c, future.getCause() }); + } + errorOutWriteLacKey(completionKey); + } + } + }); + } catch (Throwable e) { + LOG.warn("writeLac operation failed", e); + errorOutWriteLacKey(completionKey); + } + } + /** * This method should be called only after connection has been checked for * {@link #connectIfNeededAndDoOp(GenericCallback)} @@ -502,6 +575,52 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } } + public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) { + final long txnId = getTxnId(); + final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_LAC); + completionObjects.put(completionKey, + new ReadLacCompletion(readLacOpLogger, cb, ctx, ledgerId, + scheduleTimeout(completionKey, readEntryTimeout))); + // Build the request and calculate the total size to be included in the packet. + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.READ_LAC) + .setTxnId(txnId); + ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder() + .setLedgerId(ledgerId); + final Request readLacRequest = Request.newBuilder() + .setHeader(headerBuilder) + .setReadLacRequest(readLacBuilder) + .build(); + final Channel c = channel; + if (c == null) { + errorOutReadLacKey(completionKey); + return; + } + + try { + ChannelFuture future = c.write(readLacRequest); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + LOG.debug("Succssfully wrote request {} to {}", + readLacRequest, c.getRemoteAddress()); + } else { + if (!(future.getCause() instanceof ClosedChannelException)) { + LOG.warn("Writing readLac(lid = {}) to channel {} failed : ", + new Object[] { ledgerId, c, future.getCause() }); + } + errorOutReadLacKey(completionKey); + } + } + }); + } catch(Throwable e) { + LOG.warn("Read LAC operation {} failed", readLacRequest, e); + errorOutReadLacKey(completionKey); + } + } + public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) { final long txnId = getTxnId(); final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); @@ -649,6 +768,54 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan }); } + void errorOutWriteLacKey(final CompletionKey key) { + errorOutWriteLacKey(key, BKException.Code.BookieHandleNotAvailableException); + } + + void errorOutWriteLacKey(final CompletionKey key, final int rc) { + final WriteLacCompletion writeLacCompletion = (WriteLacCompletion)completionObjects.remove(key); + if (null == writeLacCompletion) { + return; + } + executor.submitOrdered(writeLacCompletion.ledgerId, new SafeRunnable() { + @Override + public void safeRun() { + String bAddress = "null"; + Channel c = channel; + if (c != null) { + bAddress = c.getRemoteAddress().toString(); + } + LOG.debug("Could not write request writeLac for ledgerId: {} bookie: {}", + new Object[] { writeLacCompletion.ledgerId, bAddress}); + writeLacCompletion.cb.writeLacComplete(rc, writeLacCompletion.ledgerId, addr, writeLacCompletion.ctx); + } + }); + } + + void errorOutReadLacKey(final CompletionKey key) { + errorOutReadLacKey(key, BKException.Code.BookieHandleNotAvailableException); + } + + void errorOutReadLacKey(final CompletionKey key, final int rc) { + final ReadLacCompletion readLacCompletion = (ReadLacCompletion)completionObjects.remove(key); + if (null == readLacCompletion) { + return; + } + executor.submitOrdered(readLacCompletion.ledgerId, new SafeRunnable() { + @Override + public void safeRun() { + String bAddress = "null"; + Channel c = channel; + if (c != null) { + bAddress = c.getRemoteAddress().toString(); + } + LOG.debug("Could not write request readLac for ledgerId: {} bookie: {}", + new Object[] { readLacCompletion.ledgerId, bAddress}); + readLacCompletion.cb.readLacComplete(rc, readLacCompletion.ledgerId, null, null, readLacCompletion.ctx); + } + }); + } + void errorOutAddKey(final CompletionKey key) { errorOutAddKey(key, BKException.Code.BookieHandleNotAvailableException); } @@ -836,6 +1003,12 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan case READ_ENTRY: handleReadResponse(response, completionValue); break; + case WRITE_LAC: + handleWriteLacResponse(response.getWriteLacResponse(), completionValue); + break; + case READ_LAC: + handleReadLacResponse(response.getReadLacResponse(), completionValue); + break; default: LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", type, addr); @@ -853,7 +1026,26 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } } - void handleAddResponse(Response response, CompletionValue completionValue) { + void handleWriteLacResponse(WriteLacResponse writeLacResponse, CompletionValue completionValue) { + // The completion value should always be an instance of an WriteLacCompletion object when we reach here. + WriteLacCompletion plc = (WriteLacCompletion)completionValue; + + long ledgerId = writeLacResponse.getLedgerId(); + StatusCode status = writeLacResponse.getStatus(); + + LOG.debug("Got response for writeLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status); + + // convert to BKException code + Integer rcToRet = statusCodeToExceptionCode(status); + if (null == rcToRet) { + LOG.error("writeLac for ledger: " + ledgerId + " failed on bookie: " + addr + + " with code:" + status); + rcToRet = BKException.Code.WriteException; + } + plc.cb.writeLacComplete(rcToRet, ledgerId, addr, plc.ctx); + } + + void handleAddResponse(Response response, CompletionValue completionValue) { // The completion value should always be an instance of an AddCompletion object when we reach here. AddCompletion ac = (AddCompletion)completionValue; AddResponse addResponse = response.getAddResponse(); @@ -866,7 +1058,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: " + entryId + " rc: " + status); } - // convert to BKException code because thats what the uppper + // convert to BKException code because thats what the upper // layers expect. This is UGLY, there should just be one set of // error codes. Integer rcToRet = statusCodeToExceptionCode(status); @@ -880,6 +1072,36 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx); } + void handleReadLacResponse(ReadLacResponse readLacResponse, CompletionValue completionValue) { + // The completion value should always be an instance of an WriteLacCompletion object when we reach here. + ReadLacCompletion glac = (ReadLacCompletion)completionValue; + + long ledgerId = readLacResponse.getLedgerId(); + StatusCode status = readLacResponse.getStatus(); + ChannelBuffer lacBuffer = ChannelBuffers.buffer(0); + ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0); + + // Thread.dumpStack(); + + if (readLacResponse.hasLacBody()) { + lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer()); + } + + if (readLacResponse.hasLastEntryBody()) { + lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer()); + } + + LOG.debug("Got response for readLac request from bookie: " + addr + " for ledger: " + ledgerId + " rc: " + status); + // convert to BKException code + Integer rcToRet = statusCodeToExceptionCode(status); + if (null == rcToRet) { + LOG.debug("readLac for ledger: " + ledgerId + " failed on bookie: " + addr + + " with code:" + status); + rcToRet = BKException.Code.ReadException; + } + glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx); + } + void handleReadResponse(Response response, CompletionValue completionValue) { // The completion value should always be an instance of a ReadCompletion object when we reach here. ReadCompletion rc = (ReadCompletion)completionValue; @@ -940,6 +1162,63 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } // visible for testing + static class WriteLacCompletion extends CompletionValue { + final WriteLacCallback cb; + + public WriteLacCompletion(WriteLacCallback cb, Object ctx, long ledgerId) { + this(null, cb, ctx, ledgerId, null); + } + + public WriteLacCompletion(final OpStatsLogger writeLacOpLogger, final WriteLacCallback originalCallback, + final Object originalCtx, final long ledgerId, final Timeout timeout) { + super(originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout); + final long startTime = MathUtils.nowInNano(); + this.cb = null == writeLacOpLogger ? originalCallback : new WriteLacCallback() { + @Override + public void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) { + cancelTimeout(); + long latency = MathUtils.elapsedNanos(startTime); + if (rc != BKException.Code.OK) { + writeLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS); + } else { + writeLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + } + originalCallback.writeLacComplete(rc, ledgerId, addr, originalCtx); + } + }; + + } + } + + // visible for testing + static class ReadLacCompletion extends CompletionValue { + final ReadLacCallback cb; + + public ReadLacCompletion(ReadLacCallback cb, Object ctx, long ledgerId) { + this (null, cb, ctx, ledgerId, null); + } + + public ReadLacCompletion(final OpStatsLogger readLacOpLogger, final ReadLacCallback originalCallback, + final Object ctx, final long ledgerId, final Timeout timeout) { + super(ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, timeout); + final long startTime = MathUtils.nowInNano(); + this.cb = null == readLacOpLogger ? originalCallback : new ReadLacCallback() { + @Override + public void readLacComplete(int rc, long ledgerId, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, Object ctx) { + cancelTimeout(); + long latency = MathUtils.elapsedNanos(startTime); + if (rc != BKException.Code.OK) { + readLacOpLogger.registerFailedEvent(latency, TimeUnit.NANOSECONDS); + } else { + readLacOpLogger.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + } + originalCallback.readLacComplete(rc, ledgerId, lacBuffer, lastEntryBuffer, ctx); + } + }; + } + } + + // visible for testing static class ReadCompletion extends CompletionValue { final ReadEntryCallback cb; @@ -1070,11 +1349,17 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan if (OperationType.ADD_ENTRY == operationType) { errorOutAddKey(this, BKException.Code.TimeoutException); addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); - } else { + } else if (OperationType.READ_ENTRY == operationType) { errorOutReadKey(this, BKException.Code.TimeoutException); readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); + } else if (OperationType.WRITE_LAC == operationType) { + errorOutWriteLacKey(this, BKException.Code.TimeoutException); + writeLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); + } else { + errorOutReadLacKey(this, BKException.Code.TimeoutException); + readLacTimeoutOpLogger.registerSuccessfulEvent(elapsedTime(), TimeUnit.NANOSECONDS); } - } + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java new file mode 100644 index 0000000..e9a4c13 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; +import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; +import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.apache.bookkeeper.util.MathUtils; +import org.jboss.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; + +class ReadLacProcessorV3 extends PacketProcessorBaseV3 { + private final static Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class); + + public ReadLacProcessorV3(Request request, Channel channel, + BookieRequestProcessor requestProcessor) { + super(request, channel, requestProcessor); + } + + // Returns null if there is no exception thrown + private ReadLacResponse getReadLacResponse() { + final long startTimeNanos = MathUtils.nowInNano(); + ReadLacRequest readLacRequest = request.getReadLacRequest(); + long ledgerId = readLacRequest.getLedgerId(); + + final ReadLacResponse.Builder readLacResponse = ReadLacResponse.newBuilder().setLedgerId(ledgerId); + + if (!isVersionCompatible()) { + readLacResponse.setStatus(StatusCode.EBADVERSION); + return readLacResponse.build(); + } + + logger.debug("Received ReadLac request: {}", request); + StatusCode status = StatusCode.EOK; + ByteBuffer lastEntry; + ByteBuffer lac; + try { + lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); + lac = requestProcessor.bookie.getExplicitLac(ledgerId); + if (lac != null) { + readLacResponse.setLacBody(ByteString.copyFrom(lac)); + readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry)); + } else { + status = StatusCode.ENOENTRY; + } + } catch (Bookie.NoLedgerException e) { + status = StatusCode.ENOLEDGER; + logger.error("No ledger found while performing readLac from ledger: {}", ledgerId); + } catch (IOException e) { + status = StatusCode.EIO; + logger.error("IOException while performing readLac from ledger: {}", ledgerId); + } + if (status == StatusCode.EOK) { + requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), + TimeUnit.NANOSECONDS); + } else { + requestProcessor.readLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), + TimeUnit.NANOSECONDS); + } + // Finally set the status and return + readLacResponse.setStatus(status); + return readLacResponse.build(); + } + + @Override + public void safeRun() { + ReadLacResponse readLacResponse = getReadLacResponse(); + sendResponse(readLacResponse); + } + + private void sendResponse(ReadLacResponse readLacResponse) { + Response.Builder response = Response.newBuilder() + .setHeader(getHeader()) + .setStatus(readLacResponse.getStatus()) + .setReadLacResponse(readLacResponse); + sendResponse(response.getStatus(), + response.build(), + requestProcessor.readRequestStats); + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java new file mode 100644 index 0000000..104f561 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java @@ -0,0 +1,113 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; +import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; +import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.apache.bookkeeper.util.MathUtils; +import org.jboss.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class WriteLacProcessorV3 extends PacketProcessorBaseV3 { + private final static Logger logger = LoggerFactory.getLogger(WriteLacProcessorV3.class); + + public WriteLacProcessorV3(Request request, Channel channel, + BookieRequestProcessor requestProcessor) { + super(request, channel, requestProcessor); + } + + // Returns null if there is no exception thrown + private WriteLacResponse getWriteLacResponse() { + final long startTimeNanos = MathUtils.nowInNano(); + WriteLacRequest writeLacRequest = request.getWriteLacRequest(); + long lac = writeLacRequest.getLac(); + long ledgerId = writeLacRequest.getLedgerId(); + + final WriteLacResponse.Builder writeLacResponse = WriteLacResponse.newBuilder().setLedgerId(ledgerId); + + if (!isVersionCompatible()) { + writeLacResponse.setStatus(StatusCode.EBADVERSION); + return writeLacResponse.build(); + } + + if (requestProcessor.bookie.isReadOnly()) { + logger.warn("BookieServer is running as readonly mode, so rejecting the request from the client!"); + writeLacResponse.setStatus(StatusCode.EREADONLY); + return writeLacResponse.build(); + } + + StatusCode status = null; + ByteBuffer lacToAdd = writeLacRequest.getBody().asReadOnlyByteBuffer(); + byte[] masterKey = writeLacRequest.getMasterKey().toByteArray(); + + try { + requestProcessor.bookie.setExplicitLac(lacToAdd, channel, masterKey); + status = StatusCode.EOK; + } catch (IOException e) { + logger.error("Error saving lac for ledger:{}", + new Object[] { lac, ledgerId, e }); + status = StatusCode.EIO; + } catch (BookieException e) { + logger.error("Unauthorized access to ledger:{} while adding lac:{}", + ledgerId, lac); + status = StatusCode.EUA; + } catch (Throwable t) { + logger.error("Unexpected exception while writing {}@{} : ", + new Object[] { lac, t }); + // some bad request which cause unexpected exception + status = StatusCode.EBADREQ; + } + + // If everything is okay, we return null so that the calling function + // dosn't return a response back to the caller. + if (status.equals(StatusCode.EOK)) { + requestProcessor.writeLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } else { + requestProcessor.writeLacStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + writeLacResponse.setStatus(status); + return writeLacResponse.build(); + } + + @Override + public void safeRun() { + WriteLacResponse writeLacResponse = getWriteLacResponse(); + if (null != writeLacResponse) { + Response.Builder response = Response.newBuilder() + .setHeader(getHeader()) + .setStatus(writeLacResponse.getStatus()) + .setWriteLacResponse(writeLacResponse); + Response resp = response.build(); + sendResponse(writeLacResponse.getStatus(), resp, requestProcessor.writeLacStats); + } + } +} + + http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java index f1d0e9f..64e524d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java @@ -21,12 +21,15 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -58,9 +61,8 @@ import org.slf4j.LoggerFactory; public class OrderedSafeExecutor { final static long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1); final String name; - final ThreadPoolExecutor threads[]; + final ScheduledThreadPoolExecutor threads[]; final long threadIds[]; - final BlockingQueue<Runnable> queues[]; final Random rand = new Random(); final OpStatsLogger taskExecutionStats; final OpStatsLogger taskPendingStats; @@ -173,17 +175,15 @@ public class OrderedSafeExecutor { this.warnTimeMicroSec = warnTimeMicroSec; name = baseName; - threads = new ThreadPoolExecutor[numThreads]; + threads = new ScheduledThreadPoolExecutor[numThreads]; threadIds = new long[numThreads]; - queues = new BlockingQueue[numThreads]; for (int i = 0; i < numThreads; i++) { - queues[i] = new LinkedBlockingQueue<Runnable>(); - threads[i] = new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, queues[i], + threads[i] = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() .setNameFormat(name + "-orderedsafeexecutor-" + i + "-%d") .setThreadFactory(threadFactory) .build()); + threads[i].setMaximumPoolSize(1); // Save thread ids final int idx = i; @@ -209,7 +209,7 @@ public class OrderedSafeExecutor { @Override public Number getSample() { - return queues[idx].size(); + return threads[idx].getQueue().size(); } }); statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge<Number>() { @@ -242,7 +242,7 @@ public class OrderedSafeExecutor { this.traceTaskExecution = traceTaskExecution; } - ExecutorService chooseThread() { + ScheduledExecutorService chooseThread() { // skip random # generation in this special case if (threads.length == 1) { return threads[0]; @@ -252,7 +252,7 @@ public class OrderedSafeExecutor { } - ExecutorService chooseThread(Object orderingKey) { + ScheduledExecutorService chooseThread(Object orderingKey) { // skip hashcode generation in this special case if (threads.length == 1) { return threads[0]; @@ -286,6 +286,104 @@ public class OrderedSafeExecutor { chooseThread(orderingKey).submit(timedRunnable(r)); } + /** + * Creates and executes a one-shot action that becomes enabled after the given delay. + * + * @param command - the SafeRunnable to execute + * @param delay - the time from now to delay execution + * @param unit - the time unit of the delay parameter + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion + */ + public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) { + return chooseThread().schedule(command, delay, unit); + } + + /** + * Creates and executes a one-shot action that becomes enabled after the given delay. + * + * @param orderingKey - the key used for ordering + * @param command - the SafeRunnable to execute + * @param delay - the time from now to delay execution + * @param unit - the time unit of the delay parameter + * @return a ScheduledFuture representing pending completion of the task and whose get() method will return null upon completion + */ + public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) { + return chooseThread(orderingKey).schedule(command, delay, unit); + } + + /** + * Creates and executes a periodic action that becomes enabled first after + * the given initial delay, and subsequently with the given period; + * + * For more details check scheduleAtFixedRate in interface ScheduledExecutorService + * + * @param command - the SafeRunnable to execute + * @param initialDelay - the time to delay first execution + * @param period - the period between successive executions + * @param unit - the time unit of the initialDelay and period parameters + * @return a ScheduledFuture representing pending completion of the task, and whose get() + * method will throw an exception upon cancellation + */ + public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) { + return chooseThread().scheduleAtFixedRate(command, initialDelay, period, unit); + } + + /** + * Creates and executes a periodic action that becomes enabled first after + * the given initial delay, and subsequently with the given period; + * + * For more details check scheduleAtFixedRate in interface ScheduledExecutorService + * + * @param orderingKey - the key used for ordering + * @param command - the SafeRunnable to execute + * @param initialDelay - the time to delay first execution + * @param period - the period between successive executions + * @param unit - the time unit of the initialDelay and period parameters + * @return a ScheduledFuture representing pending completion of the task, and whose get() method + * will throw an exception upon cancellation + */ + public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay, + long period, TimeUnit unit) { + return chooseThread(orderingKey).scheduleAtFixedRate(command, initialDelay, period, unit); + } + + /** + * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently + * with the given delay between the termination of one execution and the commencement of the next. + * + * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService + * + * @param command - the SafeRunnable to execute + * @param initialDelay - the time to delay first execution + * @param delay - the delay between the termination of one execution and the commencement of the next + * @param unit - the time unit of the initialDelay and delay parameters + * @return a ScheduledFuture representing pending completion of the task, and whose get() method + * will throw an exception upon cancellation + */ + public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay, + TimeUnit unit) { + return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + /** + * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently + * with the given delay between the termination of one execution and the commencement of the next. + * + * For more details check scheduleWithFixedDelay in interface ScheduledExecutorService + * + * @param orderingKey - the key used for ordering + * @param command - the SafeRunnable to execute + * @param initialDelay - the time to delay first execution + * @param delay - the delay between the termination of one execution and the commencement of the next + * @param unit - the time unit of the initialDelay and delay parameters + * @return a ScheduledFuture representing pending completion of the task, and whose get() method + * will throw an exception upon cancellation + */ + public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay, + long delay, TimeUnit unit) { + return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + private long getThreadID(Object orderingKey) { // skip hashcode generation in this special case if (threadIds.length == 1) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto index aabf80b..9ce9baf 100644 --- a/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto +++ b/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto @@ -57,6 +57,8 @@ enum OperationType { RANGE_ADD_ENTRY = 4; AUTH = 5; + WRITE_LAC = 6; + READ_LAC = 7; } /** @@ -74,6 +76,8 @@ message Request { optional ReadRequest readRequest = 100; optional AddRequest addRequest = 101; optional AuthMessage authRequest = 102; + optional WriteLacRequest writeLacRequest = 103; + optional ReadLacRequest readLacRequest = 104; } message ReadRequest { @@ -99,6 +103,17 @@ message AddRequest { required bytes body = 4; } +message WriteLacRequest { + required int64 ledgerId = 1; + required int64 lac = 2; + required bytes masterKey = 3; + required bytes body = 4; +} + +message ReadLacRequest { + required int64 ledgerId = 1; +} + message Response { required BKPacketHeader header = 1; @@ -109,6 +124,8 @@ message Response { optional ReadResponse readResponse = 100; optional AddResponse addResponse = 101; optional AuthMessage authResponse = 102; + optional WriteLacResponse writeLacResponse = 103; + optional ReadLacResponse readLacResponse = 104; } message ReadResponse { @@ -127,4 +144,16 @@ message AddResponse { message AuthMessage { required string authPluginName = 1; required bytes payload = 2; -} \ No newline at end of file +} + +message WriteLacResponse { + required StatusCode status = 1; + required int64 ledgerId = 2; +} + +message ReadLacResponse { + required StatusCode status = 1; + required int64 ledgerId = 2; + optional bytes lacBody = 3; // lac sent by PutLacRequest + optional bytes lastEntryBody = 4; // Actual last entry on the disk +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java index 988c2a2..1ce30e9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java @@ -334,6 +334,15 @@ public class TestSyncThread { } @Override + public void setExplicitlac(long ledgerId, ByteBuffer lac) { + } + + @Override + public ByteBuffer getExplicitLac(long ledgerId) { + return null; + } + + @Override public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException { return checkpoint; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index e87fdc0..a2532c9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -309,4 +309,128 @@ public class BookKeeperTest extends BaseTestCase { } Assert.assertTrue("BookKeeper should be closed!", _bkc.closed); } + + @Test(timeout = 60000) + public void testReadHandleWithNoExplicitLAC() throws Exception { + ClientConfiguration confWithNoExplicitLAC = new ClientConfiguration() + .setZkServers(zkUtil.getZooKeeperConnectString()); + confWithNoExplicitLAC.setExplictLacInterval(0); + + BookKeeper bkcWithNoExplicitLAC = new BookKeeper(confWithNoExplicitLAC); + + LedgerHandle wlh = bkcWithNoExplicitLAC.createLedger(digestType, "testPasswd".getBytes()); + long ledgerId = wlh.getId(); + int numOfEntries = 5; + for (int i = 0; i < numOfEntries; i++) { + wlh.addEntry(("foobar" + i).getBytes()); + } + + LedgerHandle rlh = bkcWithNoExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes()); + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries - 2); + int entryId = 0; + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + String entryString = new String(entry.getEntry()); + Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString, + entryString.equals("foobar" + entryId)); + entryId++; + } + + for (int i = numOfEntries; i < 2 * numOfEntries; i++) { + wlh.addEntry(("foobar" + i).getBytes()); + } + + Thread.sleep(3000); + // since explicitlacflush policy is not enabled for writeledgerhandle, when we try + // to read explicitlac for rlh, it will be LedgerHandle.INVALID_ENTRY_ID. But it + // wont throw some exception. + long explicitlac = rlh.readExplicitLastConfirmed(); + Assert.assertTrue( + "Expected Explicit LAC of rlh: " + LedgerHandle.INVALID_ENTRY_ID + " actual ExplicitLAC of rlh: " + explicitlac, + (explicitlac == LedgerHandle.INVALID_ENTRY_ID)); + Assert.assertTrue( + "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + wlh.getLastAddConfirmed(), + (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1))); + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + try { + rlh.readEntries(numOfEntries - 1, numOfEntries - 1); + fail("rlh readEntries beyond " + (numOfEntries - 2) + " should fail with ReadException"); + } catch (BKException.BKReadException readException) { + } + + rlh.close(); + wlh.close(); + bkcWithNoExplicitLAC.close(); + } + + @Test(timeout = 60000) + public void testReadHandleWithExplicitLAC() throws Exception { + ClientConfiguration confWithExplicitLAC = new ClientConfiguration() + .setZkServers(zkUtil.getZooKeeperConnectString()); + int explictLacInterval = 1; + confWithExplicitLAC.setExplictLacInterval(explictLacInterval); + + BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC); + + LedgerHandle wlh = bkcWithExplicitLAC.createLedger(digestType, "testPasswd".getBytes()); + long ledgerId = wlh.getId(); + int numOfEntries = 5; + for (int i = 0; i < numOfEntries; i++) { + wlh.addEntry(("foobar" + i).getBytes()); + } + + LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes()); + + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + for (int i = numOfEntries; i < 2 * numOfEntries; i++) { + wlh.addEntry(("foobar" + i).getBytes()); + } + + // we need to wait for atleast 2 explicitlacintervals, + // since in writehandle for the first call + // lh.getExplicitLastAddConfirmed() will be < + // lh.getPiggyBackedLastAddConfirmed(), + // so it wont make explicit writelac in the first run + Thread.sleep((2 * explictLacInterval + 1) * 1000); + Assert.assertTrue( + "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(), + (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1))); + // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + long explicitlac = rlh.readExplicitLastConfirmed(); + Assert.assertTrue( + "Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1) + " actual ExplicitLAC of rlh: " + explicitlac, + (explicitlac == (2 * numOfEntries - 1))); + // readExplicitLastConfirmed updates the lac of rlh. + Assert.assertTrue( + "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1))); + + Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1); + int entryId = numOfEntries; + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + String entryString = new String(entry.getEntry()); + Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString, + entryString.equals("foobar" + entryId)); + entryId++; + } + + rlh.close(); + wlh.close(); + bkcWithExplicitLAC.close(); + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/c813b3d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 5387424..4c2ddaa 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -206,5 +206,17 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { @Override public void flushEntriesLocationsIndex() throws IOException { } + + @Override + public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public ByteBuffer getExplicitLac(long ledgerId) { + // TODO Auto-generated method stub + return null; + } } }
