http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 7db620d..99d257b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -17,9 +17,36 @@ */ package org.apache.bookkeeper.proto; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + + import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Collections; @@ -43,6 +70,8 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; @@ -59,52 +88,22 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback; + import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory; -import org.jboss.netty.channel.local.LocalClientChannelFactory; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.handler.codec.frame.CorruptedFrameException; -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; -import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; -import org.jboss.netty.handler.timeout.ReadTimeoutHandler; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; +import org.apache.commons.lang.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; -import com.google.protobuf.ByteString; import com.google.protobuf.ExtensionRegistry; import java.net.SocketAddress; import org.apache.bookkeeper.auth.BookKeeperPrincipal; -import org.jboss.netty.channel.ChannelFactory; import org.apache.bookkeeper.client.ClientConnectionPeer; /** @@ -112,7 +111,8 @@ import org.apache.bookkeeper.client.ClientConnectionPeer; * has reconnect logic if a connection to a bookie fails. * */ -public class PerChannelBookieClient extends SimpleChannelHandler implements ChannelPipelineFactory { +@Sharable +public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class); @@ -129,7 +129,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan public static final AtomicLong txnIdGenerator = new AtomicLong(0); final BookieSocketAddress addr; - final ChannelFactory channelFactory; + final EventLoopGroup eventLoopGroup; final OrderedSafeExecutor executor; final HashedWheelTimer requestTimer; final int addEntryTimeout; @@ -175,21 +175,22 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan private final ClientAuthProvider.Factory authProviderFactory; private final ExtensionRegistry extRegistry; - public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, + public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr) { - this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, null, null, null); + this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, NullStatsLogger.INSTANCE, null, null, + null); } - public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, + public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry) { - this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE, + this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, NullStatsLogger.INSTANCE, authProviderFactory, extRegistry, null); } public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, - ClientSocketChannelFactory channelFactory, BookieSocketAddress addr, + EventLoopGroup eventLoopGroup, BookieSocketAddress addr, HashedWheelTimer requestTimer, StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, @@ -198,10 +199,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan this.conf = conf; this.addr = addr; this.executor = executor; - if (LocalBookiesRegistry.isLocalBookie(addr)){ - this.channelFactory = new DefaultLocalClientChannelFactory(); + if (LocalBookiesRegistry.isLocalBookie(addr)) { + this.eventLoopGroup = new DefaultEventLoopGroup(); } else { - this.channelFactory = channelFactory; + this.eventLoopGroup = eventLoopGroup; } this.state = ConnectionState.DISCONNECTED; this.requestTimer = requestTimer; @@ -239,7 +240,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan public SocketAddress getRemoteAddr() { Channel c = channel; if (c != null) { - return c.getRemoteAddress(); + return c.remoteAddress(); } else { return null; } @@ -287,56 +288,94 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } } - private void connect() { + protected ChannelFuture connect() { LOG.debug("Connecting to bookie: {}", addr); - // Set up the ClientBootStrap so we can create a new Channel connection - // to the bookie. - ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - bootstrap.setPipelineFactory(this); - bootstrap.setOption("tcpNoDelay", conf.getClientTcpNoDelay()); - bootstrap.setOption("keepAlive", true); - bootstrap.setOption("connectTimeoutMillis", conf.getClientConnectTimeoutMillis()); - bootstrap.setOption("child.sendBufferSize", conf.getClientSendBufferSize()); - bootstrap.setOption("child.receiveBufferSize", conf.getClientReceiveBufferSize()); - bootstrap.setOption("writeBufferLowWaterMark", conf.getClientWriteBufferLowWaterMark()); - bootstrap.setOption("writeBufferHighWaterMark", conf.getClientWriteBufferHighWaterMark()); - SocketAddress bookieAddr = addr.getSocketAddress(); - if (channelFactory instanceof LocalClientChannelFactory) { + // Set up the ClientBootStrap so we can create a new Channel connection to the bookie. + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup); + if (eventLoopGroup instanceof EpollEventLoopGroup) { + bootstrap.channel(EpollSocketChannel.class); + } else if (eventLoopGroup instanceof DefaultEventLoopGroup) { + bootstrap.channel(LocalChannel.class); + } else { + bootstrap.channel(NioSocketChannel.class); + } + + bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getClientConnectTimeoutMillis()); + bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( + conf.getClientWriteBufferLowWaterMark(), conf.getClientWriteBufferHighWaterMark())); + + if (!(eventLoopGroup instanceof DefaultEventLoopGroup)) { + bootstrap.option(ChannelOption.TCP_NODELAY, conf.getClientTcpNoDelay()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, conf.getClientSockKeepalive()); + + // if buffer sizes are 0, let OS auto-tune it + if (conf.getClientSendBufferSize() > 0) { + bootstrap.option(ChannelOption.SO_SNDBUF, conf.getClientSendBufferSize()); + } + + if (conf.getClientReceiveBufferSize() > 0) { + bootstrap.option(ChannelOption.SO_RCVBUF, conf.getClientReceiveBufferSize()); + } + } + + // In the netty pipeline, we need to split packets based on length, so we + // use the {@link LengthFieldBasedFramDecoder}. Other than that all actions + // are carried out in this class, e.g., making sense of received messages, + // prepending the length to outgoing packets etc. + bootstrap.handler(new ChannelInitializer<Channel>() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast("lengthbasedframedecoder", + new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); + pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); + pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry)); + pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry)); + pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator, connectionPeer)); + pipeline.addLast("mainhandler", PerChannelBookieClient.this); + } + }); + + SocketAddress bookieAddr = addr.getSocketAddress(); + if (eventLoopGroup instanceof DefaultEventLoopGroup) { bookieAddr = addr.getLocalAddress(); } + ChannelFuture future = bootstrap.connect(bookieAddr); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.getChannel()); + LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.channel()); int rc; Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps; synchronized (PerChannelBookieClient.this) { if (future.isSuccess() && state == ConnectionState.CONNECTING) { - LOG.info("Successfully connected to bookie: {}", future.getChannel()); + LOG.info("Successfully connected to bookie: {}", future.channel()); rc = BKException.Code.OK; - channel = future.getChannel(); + channel = future.channel(); state = ConnectionState.CONNECTED; } else if (future.isSuccess() && (state == ConnectionState.CLOSED || state == ConnectionState.DISCONNECTED)) { LOG.warn("Closed before connection completed, clean up: {}, current state {}", - future.getChannel(), state); - closeChannel(future.getChannel()); + future.channel(), state); + closeChannel(future.channel()); rc = BKException.Code.BookieHandleNotAvailableException; channel = null; } else if (future.isSuccess() && state == ConnectionState.CONNECTED) { LOG.debug("Already connected with another channel({}), so close the new channel({})", - channel, future.getChannel()); - closeChannel(future.getChannel()); + channel, future.channel()); + closeChannel(future.channel()); return; // pendingOps should have been completed when other channel connected } else { LOG.error("Could not connect to bookie: {}/{}, current state {} : ", - new Object[] { future.getChannel(), addr, - state, future.getCause() }); + new Object[] { future.channel(), addr, state, future.cause() }); rc = BKException.Code.BookieHandleNotAvailableException; - closeChannel(future.getChannel()); + closeChannel(future.channel()); channel = null; if (state != ConnectionState.CLOSED) { state = ConnectionState.DISCONNECTED; @@ -356,6 +395,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } } }); + + return future; } void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) { @@ -402,7 +443,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } - void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ChannelBuffer toSend, WriteLacCallback cb, Object ctx) { + void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteBuf toSend, WriteLacCallback cb, + Object ctx) { final long txnId = getTxnId(); final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.WRITE_LAC); // writeLac is mostly like addEntry hence uses addEntryTimeout @@ -418,7 +460,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan .setLedgerId(ledgerId) .setLac(lac) .setMasterKey(ByteString.copyFrom(masterKey)) - .setBody(ByteString.copyFrom(toSend.toByteBuffer())); + .setBody(ByteString.copyFrom(toSend.nioBuffer())); final Request writeLacRequest = Request.newBuilder() .setHeader(headerBuilder) @@ -431,19 +473,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan return; } try { - ChannelFuture future = c.write(writeLacRequest); + ChannelFuture future = c.writeAndFlush(writeLacRequest); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully wrote request for writeLac LedgerId: {} bookie: {}", - ledgerId, c.getRemoteAddress()); + ledgerId, c.remoteAddress()); } } else { - if (!(future.getCause() instanceof ClosedChannelException)) { + if (!(future.cause() instanceof ClosedChannelException)) { LOG.warn("Writing Lac(lid={} to channel {} failed : ", - new Object[] { ledgerId, c, future.getCause() }); + new Object[] { ledgerId, c, future.cause() }); } errorOutWriteLacKey(completionKey); } @@ -474,7 +516,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan * @param options * Add options */ - void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb, + void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf toSend, WriteCallback cb, Object ctx, final int options) { Object request = null; CompletionKey completion = null; @@ -482,8 +524,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan completion = new V2CompletionKey(ledgerId, entryId, OperationType.ADD_ENTRY); request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, (short) options, masterKey, toSend); - - } else { final long txnId = getTxnId(); completion = new V3CompletionKey(txnId, OperationType.ADD_ENTRY); @@ -493,15 +533,18 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan .setOperation(OperationType.ADD_ENTRY) .setTxnId(txnId); + byte[] toSendArray = new byte[toSend.readableBytes()]; + toSend.getBytes(toSend.readerIndex(), toSendArray); AddRequest.Builder addBuilder = AddRequest.newBuilder() .setLedgerId(ledgerId) .setEntryId(entryId) .setMasterKey(ByteString.copyFrom(masterKey)) - .setBody(ByteString.copyFrom(toSend.toByteBuffer())); + .setBody(ByteString.copyFrom(toSendArray)); if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) { addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD); } + request = Request.newBuilder() .setHeader(headerBuilder) .setAddRequest(addBuilder) @@ -519,23 +562,24 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final Channel c = channel; if (c == null) { errorOutAddKey(completionKey); + toSend.release(); return; } try { - ChannelFuture future = c.write(addRequest); + ChannelFuture future = c.writeAndFlush(addRequest); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId - + " bookie: " + c.getRemoteAddress() + " entry length: " + entrySize); + + " bookie: " + c.remoteAddress() + " entry length: " + entrySize); } // totalBytesOutstanding.addAndGet(entrySize); } else { - if (!(future.getCause() instanceof ClosedChannelException)) { + if (!(future.cause() instanceof ClosedChannelException)) { LOG.warn("Writing addEntry(lid={}, eid={}) to channel {} failed : ", - new Object[] { ledgerId, entryId, c, future.getCause() }); + new Object[] { ledgerId, entryId, c, future.cause() }); } errorOutAddKey(completionKey); } @@ -594,19 +638,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final Object readRequest = request; try { - ChannelFuture future = c.write(readRequest); + ChannelFuture future = c.writeAndFlush(readRequest); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully wrote request {} to {}", - readRequest, c.getRemoteAddress()); + readRequest, c.remoteAddress()); } } else { - if (!(future.getCause() instanceof ClosedChannelException)) { + if (!(future.cause() instanceof ClosedChannelException)) { LOG.warn("Writing readEntryAndFenceLedger(lid={}, eid={}) to channel {} failed : ", - new Object[] { ledgerId, entryId, c, future.getCause() }); + new Object[] { ledgerId, entryId, c, future.cause() }); } errorOutReadKey(completionKey); } @@ -623,8 +667,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan CompletionKey completion = null; if (useV2WireProtocol) { request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, - ledgerId, (long) 0, (short) 0); - completion = new V2CompletionKey(ledgerId, (long) 0, OperationType.READ_LAC); + ledgerId, 0, (short) 0); + completion = new V2CompletionKey(ledgerId, 0, OperationType.READ_LAC); } else { final long txnId = getTxnId(); completion = new V3CompletionKey(txnId, OperationType.READ_LAC); @@ -654,17 +698,17 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } try { - ChannelFuture future = c.write(readLacRequest); + ChannelFuture future = c.writeAndFlush(readLacRequest); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { LOG.debug("Succssfully wrote request {} to {}", - readLacRequest, c.getRemoteAddress()); + readLacRequest, c.remoteAddress()); } else { - if (!(future.getCause() instanceof ClosedChannelException)) { + if (!(future.cause() instanceof ClosedChannelException)) { LOG.warn("Writing readLac(lid = {}) to channel {} failed : ", - new Object[] { ledgerId, c, future.getCause() }); + new Object[] { ledgerId, c, future.cause() }); } errorOutReadLacKey(completionKey); } @@ -715,19 +759,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } try{ - ChannelFuture future = c.write(readRequest); + ChannelFuture future = c.writeAndFlush(readRequest); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully wrote request {} to {}", - readRequest, c.getRemoteAddress()); + readRequest, c.remoteAddress()); } } else { - if (!(future.getCause() instanceof ClosedChannelException)) { + if (!(future.cause() instanceof ClosedChannelException)) { LOG.warn("Writing readEntry(lid={}, eid={}) to channel {} failed : ", - new Object[] { ledgerId, entryId, c, future.getCause() }); + new Object[] { ledgerId, entryId, c, future.cause() }); } errorOutReadKey(completionKey); } @@ -767,19 +811,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } try{ - ChannelFuture future = c.write(getBookieInfoRequest); + ChannelFuture future = c.writeAndFlush(getBookieInfoRequest); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully wrote request {} to {}", - getBookieInfoRequest, c.getRemoteAddress()); + getBookieInfoRequest, c.remoteAddress()); } } else { - if (!(future.getCause() instanceof ClosedChannelException)) { + if (!(future.cause() instanceof ClosedChannelException)) { LOG.warn("Writing GetBookieInfoRequest(flags={}) to channel {} failed : ", - new Object[] { requested, c, future.getCause() }); + new Object[] { requested, c, future.cause() }); } errorOutReadKey(completionKey); } @@ -846,10 +890,6 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan private ChannelFuture closeChannel(Channel c) { LOG.debug("Closing channel {}", c); - ReadTimeoutHandler timeout = c.getPipeline().get(ReadTimeoutHandler.class); - if (timeout != null) { - timeout.releaseExternalResources(); - } return c.close(); } @@ -868,8 +908,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan public void safeRun() { String bAddress = "null"; Channel c = channel; - if (c != null && c.getRemoteAddress() != null) { - bAddress = c.getRemoteAddress().toString(); + if (c != null && c.remoteAddress() != null) { + bAddress = c.remoteAddress().toString(); } LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {} rc: {}", @@ -902,7 +942,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan String bAddress = "null"; Channel c = channel; if (c != null) { - bAddress = c.getRemoteAddress().toString(); + bAddress = c.remoteAddress().toString(); } LOG.debug("Could not write request writeLac for ledgerId: {} bookie: {}", new Object[] { writeLacCompletion.ledgerId, bAddress}); @@ -927,7 +967,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan String bAddress = "null"; Channel c = channel; if (c != null) { - bAddress = c.getRemoteAddress().toString(); + bAddress = c.remoteAddress().toString(); } LOG.debug("Could not write request readLac for ledgerId: {} bookie: {}", new Object[] { readLacCompletion.ledgerId, bAddress}); @@ -950,8 +990,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan public void safeRun() { String bAddress = "null"; Channel c = channel; - if(c != null && c.getRemoteAddress() != null) { - bAddress = c.getRemoteAddress().toString(); + if (c != null && c.remoteAddress() != null) { + bAddress = c.remoteAddress().toString(); } LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {} rc: {}", new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress, rc }); @@ -983,7 +1023,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan String bAddress = "null"; Channel c = channel; if (c != null) { - bAddress = c.getRemoteAddress().toString(); + bAddress = c.remoteAddress().toString(); } LOG.debug("Could not write getBookieInfo request for bookie: {}", new Object[] {bAddress}); getBookieInfoCompletion.cb.getBookieInfoComplete(rc, new BookieInfo(), getBookieInfoCompletion.ctx); @@ -1027,40 +1067,19 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } /** - * In the netty pipeline, we need to split packets based on length, so we - * use the {@link LengthFieldBasedFrameDecoder}. Other than that all actions - * are carried out in this class, e.g., making sense of received messages, - * prepending the length to outgoing packets etc. - */ - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - - pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); - pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry)); - pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry)); - pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator, - connectionPeer)); - pipeline.addLast("mainhandler", this); - return pipeline; - } - - /** * If our channel has disconnected, we just error out the pending entries */ @Override - public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - Channel c = ctx.getChannel(); - LOG.info("Disconnected from bookie channel {}", c); - if (c != null) { - closeChannel(c); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + LOG.info("Disconnected from bookie channel {}", ctx.channel()); + if (ctx.channel() != null) { + closeChannel(ctx.channel()); } errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException); synchronized (this) { - if (this.channel == c + if (this.channel == ctx.channel() && state != ConnectionState.CLOSED) { state = ConnectionState.DISCONNECTED; } @@ -1075,56 +1094,60 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan * (mostly due to what we do in the netty threads) */ @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - Throwable t = e.getCause(); - if (t instanceof CorruptedFrameException || t instanceof TooLongFrameException) { - LOG.error("Corrupted frame received from bookie: {}", - e.getChannel().getRemoteAddress()); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) { + LOG.error("Corrupted frame received from bookie: {}", ctx.channel().remoteAddress()); + ctx.close(); return; } - if (t instanceof AuthHandler.AuthenticationException) { - LOG.error("Error authenticating connection", t); + if (cause instanceof AuthHandler.AuthenticationException) { + LOG.error("Error authenticating connection", cause); errorOutOutstandingEntries(BKException.Code.UnauthorizedAccessException); - Channel c = ctx.getChannel(); + Channel c = ctx.channel(); if (c != null) { closeChannel(c); } return; } - if (t instanceof IOException) { + if (cause instanceof IOException) { // these are thrown when a bookie fails, logging them just pollutes // the logs (the failure is logged from the listeners on the write // operation), so I'll just ignore it here. + ctx.close(); return; } synchronized (this) { if (state == ConnectionState.CLOSED) { - LOG.debug("Unexpected exception caught by bookie client channel handler, " - + "but the client is closed, so it isn't important", t); + if (LOG.isDebugEnabled()) { + LOG.debug("Unexpected exception caught by bookie client channel handler, " + + "but the client is closed, so it isn't important", cause); + } } else { - LOG.error("Unexpected exception caught by bookie client channel handler", t); + LOG.error("Unexpected exception caught by bookie client channel handler", cause); } } + // Since we are a library, cant terminate App here, can we? + ctx.close(); } /** * Called by netty when a message is received on a channel */ @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (e.getMessage() instanceof BookieProtocol.Response) { - BookieProtocol.Response response = (BookieProtocol.Response) e.getMessage(); + if (msg instanceof BookieProtocol.Response) { + BookieProtocol.Response response = (BookieProtocol.Response) msg; readV2Response(response); - } else if (e.getMessage() instanceof Response) { - Response response = (Response) e.getMessage(); + } else if (msg instanceof Response) { + Response response = (Response) msg; readV3Response(response); } else { - ctx.sendUpstream(e); + ctx.fireChannelRead(msg); } } @@ -1156,7 +1179,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } case READ_ENTRY: { BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response; - ChannelBuffer data = null; + ByteBuf data = null; if (readResponse.hasData()) { data = readResponse.getData(); } @@ -1244,9 +1267,9 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan case READ_ENTRY: { ReadResponse readResponse = response.getReadResponse(); StatusCode status = response.getStatus() == StatusCode.EOK ? readResponse.getStatus() : response.getStatus(); - ChannelBuffer buffer = ChannelBuffers.buffer(0); + ByteBuf buffer = Unpooled.EMPTY_BUFFER; if (readResponse.hasBody()) { - buffer = ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer()); + buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer()); } handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, completionValue); break; @@ -1259,17 +1282,17 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan } case READ_LAC: { ReadLacResponse readLacResponse = response.getReadLacResponse(); - ChannelBuffer lacBuffer = ChannelBuffers.buffer(0); - ChannelBuffer lastEntryBuffer = ChannelBuffers.buffer(0); + ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER; + ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER; StatusCode status = response.getStatus() == StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus(); // Thread.dumpStack(); if (readLacResponse.hasLacBody()) { - lacBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer()); + lacBuffer = Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer()); } if (readLacResponse.hasLastEntryBody()) { - lastEntryBuffer = ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer()); + lastEntryBuffer = Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer()); } handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, lastEntryBuffer, completionValue); break; @@ -1335,7 +1358,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx); } - void handleReadLacResponse(long ledgerId, StatusCode status, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, CompletionValue completionValue) { + void handleReadLacResponse(long ledgerId, StatusCode status, ByteBuf lacBuffer, ByteBuf lastEntryBuffer, CompletionValue completionValue) { // The completion value should always be an instance of an WriteLacCompletion object when we reach here. ReadLacCompletion glac = (ReadLacCompletion)completionValue; @@ -1350,11 +1373,10 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), glac.ctx); } - void handleReadResponse(long ledgerId, long entryId, StatusCode status, ChannelBuffer buffer, CompletionValue completionValue) { + void handleReadResponse(long ledgerId, long entryId, StatusCode status, ByteBuf buffer, CompletionValue completionValue) { // The completion value should always be an instance of a ReadCompletion object when we reach here. ReadCompletion rc = (ReadCompletion)completionValue; - if (LOG.isDebugEnabled()) { LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: " + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes()); @@ -1466,7 +1488,8 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final long startTime = MathUtils.nowInNano(); this.cb = null == readLacOpLogger ? originalCallback : new ReadLacCallback() { @Override - public void readLacComplete(int rc, long ledgerId, ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, Object ctx) { + public void readLacComplete(int rc, long ledgerId, ByteBuf lacBuffer, ByteBuf lastEntryBuffer, + Object ctx) { cancelTimeout(); long latency = MathUtils.elapsedNanos(startTime); if (rc != BKException.Code.OK) { @@ -1497,7 +1520,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan final long startTime = MathUtils.nowInNano(); this.cb = new ReadEntryCallback() { @Override - public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) { + public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) { cancelTimeout(); if (readEntryOpLogger != null) { long latency = MathUtils.elapsedNanos(startTime);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java index 4a735f9..b7618e3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java @@ -38,7 +38,7 @@ interface PerChannelBookieClientPool { * @param callback * callback to return channel from channel pool. */ - void obtain(GenericCallback<PerChannelBookieClient> callback); + void obtain(GenericCallback<PerChannelBookieClient> callback, long key); /** * record any read/write error on {@link PerChannelBookieClientPool} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 43360fa..bd98374 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -17,6 +17,8 @@ */ package org.apache.bookkeeper.proto; +import io.netty.channel.Channel; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; @@ -28,7 +30,6 @@ import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,7 @@ class ReadEntryProcessor extends PacketProcessorBase { try { Future<Boolean> fenceResult = null; if (read.isFencingRequest()) { - LOG.warn("Ledger " + request.getLedgerId() + " fenced by " + channel.getRemoteAddress()); + LOG.warn("Ledger: {} fenced by: {}", request.getLedgerId(), channel.remoteAddress()); if (read.hasMasterKey()) { fenceResult = requestProcessor.bookie.fenceLedger(read.getLedgerId(), read.getMasterKey()); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index b9037c1..fbfa71f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -17,6 +17,8 @@ */ package org.apache.bookkeeper.proto; +import io.netty.channel.Channel; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; @@ -32,7 +34,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,14 +70,14 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { Future<Boolean> fenceResult = null; if (readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER)) { LOG.warn("Ledger fence request received for ledger: {} from address: {}", ledgerId, - channel.getRemoteAddress()); + channel.remoteAddress()); if (readRequest.hasMasterKey()) { byte[] masterKey = readRequest.getMasterKey().toByteArray(); fenceResult = requestProcessor.bookie.fenceLedger(ledgerId, masterKey); } else { LOG.error("Fence ledger request received without master key for ledger:{} from address: {}", - ledgerId, channel.getRemoteAddress()); + ledgerId, channel.remoteAddress()); throw BookieException.create(BookieException.Code.UnauthorizedAccessException); } } @@ -129,7 +130,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { LOG.error("IOException while reading entry:{} from ledger:{}", entryId, ledgerId); } catch (BookieException e) { LOG.error("Unauthorized access to ledger:{} while reading entry:{} in request from address: {}", - new Object[]{ledgerId, entryId, channel.getRemoteAddress()}); + new Object[]{ledgerId, entryId, channel.remoteAddress()}); status = StatusCode.EUA; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java index e9a4c13..0fbdbec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java @@ -31,13 +31,14 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.protobuf.ByteString; -class ReadLacProcessorV3 extends PacketProcessorBaseV3 { +import io.netty.channel.Channel; + +class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { private final static Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class); public ReadLacProcessorV3(Request request, Channel channel, http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java index 651c118..1418437 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java @@ -20,8 +20,9 @@ */ package org.apache.bookkeeper.proto; +import io.netty.buffer.Unpooled; + import java.nio.ByteBuffer; -import org.jboss.netty.buffer.ChannelBuffers; class ResponseBuilder { static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) { @@ -42,6 +43,6 @@ class ResponseBuilder { static BookieProtocol.Response buildReadResponse(ByteBuffer data, BookieProtocol.Request r) { return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK, - r.getLedgerId(), r.getEntryId(), ChannelBuffers.wrappedBuffer(data)); + r.getLedgerId(), r.getEntryId(), Unpooled.wrappedBuffer(data)); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java deleted file mode 100644 index ab19ce0..0000000 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bookkeeper.proto; - -import java.io.IOException; -import org.apache.bookkeeper.bookie.Bookie; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory; - -/** - * Manages VM-local channels - * - * @author enrico.olivelli - */ -public class VMLocalChannelManager extends ChannelManager { - - private ChannelFactory channelFactory; - private BookieSocketAddress bookieAddress; - - @Override - public Channel start(ServerConfiguration conf, ChannelPipelineFactory bookiePipelineFactory) throws IOException { - BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf); - this.channelFactory = new DefaultLocalServerChannelFactory(); - this.bookieAddress = bookieAddress; - ServerBootstrap jvmbootstrap = new ServerBootstrap(channelFactory); - jvmbootstrap.setPipelineFactory(bookiePipelineFactory); - - // use the same address 'name', so clients can find local Bookie still discovering them using ZK - Channel jvmlisten = jvmbootstrap.bind(bookieAddress.getLocalAddress()); - LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress); - return jvmlisten; - } - - @Override - public void close() { - LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress); - if (channelFactory != null) { - channelFactory.releaseExternalResources(); - } - channelFactory = null; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index b314998..46f7f7d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -17,6 +17,8 @@ */ package org.apache.bookkeeper.proto; +import io.netty.channel.Channel; + import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -25,7 +27,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,10 @@ class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback { } catch (BookieException e) { LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e); rc = BookieProtocol.EUA; + } finally { + add.release(); } + if (rc != BookieProtocol.EOK) { requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index 242ed81..e34e894 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.proto; +import io.netty.channel.Channel; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; @@ -32,7 +34,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java index 104f561..097a573 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java @@ -31,11 +31,12 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class WriteLacProcessorV3 extends PacketProcessorBaseV3 { +import io.netty.channel.Channel; + +class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { private final static Logger logger = LoggerFactory.getLogger(WriteLacProcessorV3.class); public WriteLacProcessorV3(Request request, Channel channel, http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java new file mode 100644 index 0000000..3b4c83a --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java @@ -0,0 +1,468 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.bookkeeper.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.AbstractReferenceCountedByteBuf; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.ResourceLeakDetectorFactory; +import io.netty.util.ResourceLeakTracker; + +/** + * ByteBuf that holds 2 buffers. Similar to {@see CompositeByteBuf} but doesn't allocate list to hold them. + */ +@SuppressWarnings("unchecked") +public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf { + + private ByteBuf b1; + private ByteBuf b2; + private final Handle<DoubleByteBuf> recyclerHandle; + + private static final Recycler<DoubleByteBuf> RECYCLER = new Recycler<DoubleByteBuf>() { + @Override + protected DoubleByteBuf newObject(Recycler.Handle<DoubleByteBuf> handle) { + return new DoubleByteBuf(handle); + } + }; + + private DoubleByteBuf(Handle<DoubleByteBuf> recyclerHandle) { + super(Integer.MAX_VALUE); + this.recyclerHandle = recyclerHandle; + } + + public static ByteBuf get(ByteBuf b1, ByteBuf b2) { + DoubleByteBuf buf = RECYCLER.get(); + buf.setRefCnt(1); + + // Make sure the buffers are not deallocated as long as we hold them. Also, buffers can get retained/releases + // outside of DoubleByteBuf scope + buf.b1 = b1.retain(); + buf.b2 = b2.retain(); + buf.setIndex(0, b1.readableBytes() + b2.readableBytes()); + return toLeakAwareBuffer(buf); + } + + public ByteBuf getFirst() { + return b1; + } + + public ByteBuf getSecond() { + return b2; + } + + @Override + public boolean isDirect() { + return b1.isDirect() && b2.isDirect(); + } + + @Override + public boolean hasArray() { + // There's no single array available + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException(); + } + + @Override + public int arrayOffset() { + + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMemoryAddress() { + return false; + } + + @Override + public long memoryAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public int capacity() { + return b1.capacity() + b2.capacity(); + } + + @Override + public int readableBytes() { + return b1.readableBytes() + b2.readableBytes(); + } + + @Override + public int writableBytes() { + return 0; + } + + @Override + public DoubleByteBuf capacity(int newCapacity) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBufAllocator alloc() { + return PooledByteBufAllocator.DEFAULT; + } + + @Override + @Deprecated + public ByteOrder order() { + return ByteOrder.BIG_ENDIAN; + } + + @Override + public byte getByte(int index) { + if (index < b1.writerIndex()) { + return b1.getByte(index); + } else { + return b2.getByte(index - b1.writerIndex()); + } + } + + @Override + protected byte _getByte(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected short _getShort(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected short _getShortLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected int _getIntLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected long _getLongLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setShortLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setMediumLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setIntLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setLongLE(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected int _getUnsignedMedium(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected int _getInt(int index) { + throw new UnsupportedOperationException(); + } + + @Override + protected long _getLong(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + return getBytes(index, Unpooled.wrappedBuffer(dst), dstIndex, length); + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + checkDstIndex(index, length, dstIndex, dst.capacity()); + if (length == 0) { + return this; + } + + int b1Length = Math.min(length, b1.readableBytes() - index); + if (b1Length > 0) { + b1.getBytes(b1.readerIndex() + index, dst, dstIndex, b1Length); + dstIndex += b1Length; + length -= b1Length; + index = 0; + } else { + index -= b1.readableBytes(); + } + + if (length > 0) { + int b2Length = Math.min(length, b2.readableBytes() - index); + b2.getBytes(b2.readerIndex() + index, dst, dstIndex, b2Length); + } + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf setByte(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setByte(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf setShort(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setShort(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf setMedium(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setMedium(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf setInt(int index, int value) { + return (DoubleByteBuf) super.setInt(index, value); + } + + @Override + protected void _setInt(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf setLong(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _setLong(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf setBytes(int index, ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int nioBufferCount() { + return b1.nioBufferCount() + b2.nioBufferCount(); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + ByteBuffer dst = ByteBuffer.allocate(length); + ByteBuf b = Unpooled.wrappedBuffer(dst); + b.writerIndex(0); + getBytes(index, b, length); + return dst; + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return new ByteBuffer[] { nioBuffer(index, length) }; + } + + @Override + public DoubleByteBuf discardReadBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + String result = super.toString(); + result = result.substring(0, result.length() - 1); + return result + ", components=2)"; + } + + @Override + public ByteBuffer[] nioBuffers() { + return nioBuffers(readerIndex(), readableBytes()); + } + + @Override + protected void deallocate() { + // Double release of buffer for the initial ref-count and the internal retain() when the DoubleByteBuf was + // created + b1.release(2); + b2.release(2); + b1 = b2 = null; + recyclerHandle.recycle(this); + } + + @Override + public ByteBuf unwrap() { + return null; + } + + private static final Logger log = LoggerFactory.getLogger(DoubleByteBuf.class); + + private static final ResourceLeakDetector<DoubleByteBuf> leakDetector = ResourceLeakDetectorFactory.instance() + .newResourceLeakDetector(DoubleByteBuf.class); + private static final Constructor<ByteBuf> simpleLeakAwareByteBufConstructor; + private static final Constructor<ByteBuf> advancedLeakAwareByteBufConstructor; + + static { + Constructor<ByteBuf> _simpleLeakAwareByteBufConstructor = null; + Constructor<ByteBuf> _advancedLeakAwareByteBufConstructor = null; + try { + Class<?> simpleLeakAwareByteBufClass = Class.forName("io.netty.buffer.SimpleLeakAwareByteBuf"); + _simpleLeakAwareByteBufConstructor = (Constructor<ByteBuf>) simpleLeakAwareByteBufClass + .getDeclaredConstructor(ByteBuf.class, ResourceLeakTracker.class); + _simpleLeakAwareByteBufConstructor.setAccessible(true); + + Class<?> advancedLeakAwareByteBufClass = Class.forName("io.netty.buffer.AdvancedLeakAwareByteBuf"); + _advancedLeakAwareByteBufConstructor = (Constructor<ByteBuf>) advancedLeakAwareByteBufClass + .getDeclaredConstructor(ByteBuf.class, ResourceLeakTracker.class); + _advancedLeakAwareByteBufConstructor.setAccessible(true); + } catch (Throwable t) { + log.error("Failed to use reflection to enable leak detection", t); + } finally { + simpleLeakAwareByteBufConstructor = _simpleLeakAwareByteBufConstructor; + advancedLeakAwareByteBufConstructor = _advancedLeakAwareByteBufConstructor; + } + } + + private static ByteBuf toLeakAwareBuffer(DoubleByteBuf buf) { + try { + ResourceLeakTracker<DoubleByteBuf> leak; + switch (ResourceLeakDetector.getLevel()) { + case DISABLED: + break; + + case SIMPLE: + leak = leakDetector.track(buf); + if (leak != null) { + return simpleLeakAwareByteBufConstructor.newInstance(buf, leak); + } + break; + case ADVANCED: + case PARANOID: + leak = leakDetector.track(buf); + if (leak != null) { + return advancedLeakAwareByteBufConstructor.newInstance(buf, leak); + } + break; + } + return buf; + } catch (Throwable t) { + // Catch reflection exception + throw new RuntimeException(t); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java index 5c028f0..e2b3c47 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java @@ -38,7 +38,6 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.KeeperException; -import org.jboss.netty.channel.ChannelException; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -268,9 +267,11 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase { BookieServer bs2 = new BookieServer(conf); bs2.start(); fail("Should throw BindException, as the bk server is already running!"); - } catch (ChannelException ce) { - Assert.assertTrue("Should be caused by a bind exception", - ce.getCause() instanceof BindException); + } catch (BindException e) { + // Ok + } catch (IOException e) { + Assert.assertTrue("BKServer allowed duplicate Startups!", + e.getMessage().contains("bind")); } } @@ -292,7 +293,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase { fail("Should throw ConnectionLossException as ZKServer is not running!"); } catch (KeeperException.ConnectionLossException e) { // expected behaviour - } + } } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java index 9baa4e7..1dfa32e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java @@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.io.File; import java.io.IOException; @@ -116,14 +118,14 @@ public class BookieJournalTest { /** * Generate meta entry with given master key */ - private ByteBuffer generateMetaEntry(long ledgerId, byte[] masterKey) { + private ByteBuf generateMetaEntry(long ledgerId, byte[] masterKey) { ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length); bb.putLong(ledgerId); bb.putLong(Bookie.METAENTRY_ID_LEDGER_KEY); bb.putInt(masterKey.length); bb.put(masterKey); bb.flip(); - return bb; + return Unpooled.wrappedBuffer(bb); } private void writeJunkJournal(File journalDir) throws Exception { @@ -154,14 +156,15 @@ public class BookieJournalTest { byte[] data = "JournalTestData".getBytes(); long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; for (int i = 1; i <= numEntries; i++) { - ByteBuffer packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length, data).toByteBuffer(); + ByteBuf packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length, data); lastConfirmed = i; ByteBuffer lenBuff = ByteBuffer.allocate(4); - lenBuff.putInt(packet.remaining()); + lenBuff.putInt(packet.readableBytes()); lenBuff.flip(); fc.write(lenBuff); - fc.write(packet); + fc.write(packet.nioBuffer()); + packet.release(); } } @@ -197,14 +200,15 @@ public class BookieJournalTest { Arrays.fill(data, (byte)'X'); long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; for (int i = 1; i <= numEntries; i++) { - ByteBuffer packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length, data).toByteBuffer(); + ByteBuf packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length, data); lastConfirmed = i; ByteBuffer lenBuff = ByteBuffer.allocate(4); - lenBuff.putInt(packet.remaining()); + lenBuff.putInt(packet.readableBytes()); lenBuff.flip(); bc.write(lenBuff); - bc.write(packet); + bc.write(packet.nioBuffer()); + packet.release(); } bc.flush(true); @@ -225,19 +229,20 @@ public class BookieJournalTest { Arrays.fill(data, (byte)'X'); long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; for (int i = 0; i <= numEntries; i++) { - ByteBuffer packet; + ByteBuf packet; if (i == 0) { packet = generateMetaEntry(1, masterKey); } else { - packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length, data).toByteBuffer(); + packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length, data); } lastConfirmed = i; ByteBuffer lenBuff = ByteBuffer.allocate(4); - lenBuff.putInt(packet.remaining()); + lenBuff.putInt(packet.readableBytes()); lenBuff.flip(); bc.write(lenBuff); - bc.write(packet); + bc.write(packet.nioBuffer()); + packet.release(); } bc.flush(true); @@ -258,18 +263,19 @@ public class BookieJournalTest { Arrays.fill(data, (byte)'X'); long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; for (int i = 0; i <= numEntries; i++) { - ByteBuffer packet; + ByteBuf packet; if (i == 0) { packet = generateMetaEntry(1, masterKey); } else { - packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data).toByteBuffer(); + packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data); } lastConfirmed = i; ByteBuffer lenBuff = ByteBuffer.allocate(4); - lenBuff.putInt(packet.remaining()); + lenBuff.putInt(packet.readableBytes()); lenBuff.flip(); bc.write(lenBuff); - bc.write(packet); + bc.write(packet.nioBuffer()); + packet.release(); } // write fence key ByteBuffer packet = generateFenceEntry(1); @@ -296,20 +302,20 @@ public class BookieJournalTest { long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; long length = 0; for (int i = 0; i <= numEntries; i++) { - ByteBuffer packet; + ByteBuf packet; if (i == 0) { packet = generateMetaEntry(1, masterKey); } else { - packet = ClientUtil.generatePacket(1, i, lastConfirmed, - length, data, 0, i).toByteBuffer(); + packet = ClientUtil.generatePacket(1, i, lastConfirmed, length, data, 0, i); } lastConfirmed = i; length += i; ByteBuffer lenBuff = ByteBuffer.allocate(4); - lenBuff.putInt(packet.remaining()); + lenBuff.putInt(packet.readableBytes()); lenBuff.flip(); bc.write(lenBuff); - bc.write(packet); + bc.write(packet.nioBuffer()); + packet.release(); Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE); } // write fence key http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java index 84af7a7..f47e745 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java @@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.netty.buffer.ByteBuf; import java.io.BufferedWriter; import java.io.File; @@ -88,15 +89,16 @@ public class UpgradeTest extends BookKeeperClusterTestCase { long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID; for (int i = 1; i <= numEntries; i++) { - ByteBuffer packet = ClientUtil.generatePacket(ledgerId, i, lastConfirmed, - i*data.length, data).toByteBuffer(); + ByteBuf packet = ClientUtil.generatePacket(ledgerId, i, lastConfirmed, + i*data.length, data); lastConfirmed = i; ByteBuffer lenBuff = ByteBuffer.allocate(4); - lenBuff.putInt(packet.remaining()); + lenBuff.putInt(packet.readableBytes()); lenBuff.flip(); bc.write(lenBuff); - bc.write(packet); + bc.write(packet.nioBuffer()); + packet.release(); } bc.flush(true); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index 17d63b3..9ec835d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -194,7 +194,7 @@ public class BookKeeperTest extends BaseTestCase { public void testCloseDuringOp() throws Exception { ClientConfiguration conf = new ClientConfiguration() .setZkServers(zkUtil.getZooKeeperConnectString()); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 10; i++) { final BookKeeper client = new BookKeeper(conf); final CountDownLatch l = new CountDownLatch(1); final AtomicBoolean success = new AtomicBoolean(false); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java index b394dc2..3fc86ad 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java @@ -22,23 +22,10 @@ package org.apache.bookkeeper.client; */ import java.io.IOException; -import java.util.concurrent.Executors; import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; -import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; -import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; -import org.apache.bookkeeper.client.BKException.Code; -import org.apache.bookkeeper.proto.BookieClient; -import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; /** * Test BookKeeperClient which allows access to members we don't http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java index d25bd70..ae0a07f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java @@ -30,13 +30,14 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase; -import org.jboss.netty.buffer.ChannelBuffer; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; @@ -483,7 +484,7 @@ public class BookieRecoveryTest extends MultiLedgerManagerMultiDigestTestCase { } @Override - public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) { + public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) { if (LOG.isDebugEnabled()) { LOG.debug("Got " + rc + " for ledger " + ledgerId + " entry " + entryId + " from " + ctx); }
