http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index d2608e7..419e3aa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -21,26 +21,26 @@ package org.apache.bookkeeper.proto; import com.google.protobuf.ByteString; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + import java.io.IOException; import java.net.SocketAddress; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; + import org.apache.bookkeeper.auth.AuthCallbacks; import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.auth.AuthToken; - import org.apache.bookkeeper.auth.BookieAuthProvider; import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.DefaultExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.bookkeeper.client.ClientConnectionPeer; @@ -49,7 +49,7 @@ import org.apache.bookkeeper.bookie.BookieConnectionPeer; class AuthHandler { static final Logger LOG = LoggerFactory.getLogger(AuthHandler.class); - static class ServerSideHandler extends SimpleChannelHandler { + static class ServerSideHandler extends ChannelInboundHandlerAdapter { volatile boolean authenticated = false; final BookieAuthProvider.Factory authProviderFactory; final BookieConnectionPeer connectionPeer; @@ -62,86 +62,82 @@ class AuthHandler { } @Override - public void channelOpen(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception { authProvider = authProviderFactory.newProvider(connectionPeer, new AuthHandshakeCompleteCallback()); - super.channelOpen(ctx, e); + super.channelActive(ctx); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (authProvider != null) { authProvider.close(); } - super.channelClosed(ctx, e); + super.channelInactive(ctx); } @Override - public void messageReceived(ChannelHandlerContext ctx, - MessageEvent e) - throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (authProvider == null) { // close the channel, authProvider should only be // null if the other end of line is an InetSocketAddress // anything else is strange, and we don't want to deal // with it - ctx.getChannel().close(); + ctx.channel().close(); return; } - Object event = e.getMessage(); if (authenticated) { - super.messageReceived(ctx, e); - } else if (event instanceof BookieProtocol.AuthRequest) { // pre-PB-client - BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest)event; + super.channelRead(ctx, msg); + } else if (msg instanceof BookieProtocol.AuthRequest) { // pre-PB-client + BookieProtocol.AuthRequest req = (BookieProtocol.AuthRequest) msg; assert (req.getOpCode() == BookieProtocol.AUTH); - if (checkAuthPlugin(req.getAuthMessage(), ctx.getChannel())) { + if (checkAuthPlugin(req.getAuthMessage(), ctx.channel())) { byte[] payload = req .getAuthMessage() .getPayload() .toByteArray(); authProvider.process(AuthToken.wrap(payload), - new AuthResponseCallbackLegacy(req, ctx.getChannel())); + new AuthResponseCallbackLegacy(req, ctx.channel())); } else { - ctx.getChannel().close(); + ctx.channel().close(); } - } else if (event instanceof BookieProtocol.Request) { - BookieProtocol.Request req = (BookieProtocol.Request)event; + } else if (msg instanceof BookieProtocol.Request) { + BookieProtocol.Request req = (BookieProtocol.Request) msg; if (req.getOpCode() == BookieProtocol.ADDENTRY) { - ctx.getChannel().write( + ctx.channel().writeAndFlush( new BookieProtocol.AddResponse( req.getProtocolVersion(), BookieProtocol.EUA, req.getLedgerId(), req.getEntryId())); } else if (req.getOpCode() == BookieProtocol.READENTRY) { - ctx.getChannel().write( + ctx.channel().writeAndFlush( new BookieProtocol.ReadResponse( req.getProtocolVersion(), BookieProtocol.EUA, req.getLedgerId(), req.getEntryId())); } else { - ctx.getChannel().close(); + ctx.channel().close(); } - } else if (event instanceof BookkeeperProtocol.Request) { // post-PB-client - BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)event; + } else if (msg instanceof BookkeeperProtocol.Request) { // post-PB-client + BookkeeperProtocol.Request req = (BookkeeperProtocol.Request) msg; if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH && req.hasAuthRequest() - && checkAuthPlugin(req.getAuthRequest(), ctx.getChannel())) { + && checkAuthPlugin(req.getAuthRequest(), ctx.channel())) { byte[] payload = req .getAuthRequest() .getPayload() .toByteArray(); authProvider.process(AuthToken.wrap(payload), - new AuthResponseCallback(req, ctx.getChannel(), authProviderFactory.getPluginName())); + new AuthResponseCallback(req, ctx.channel(), authProviderFactory.getPluginName())); } else { BookkeeperProtocol.Response.Builder builder = BookkeeperProtocol.Response.newBuilder() .setHeader(req.getHeader()) .setStatus(BookkeeperProtocol.StatusCode.EUA); - ctx.getChannel().write(builder.build()); + ctx.channel().writeAndFlush(builder.build()); } } else { // close the channel, junk coming over it - ctx.getChannel().close(); + ctx.channel().close(); } } @@ -177,7 +173,7 @@ class AuthHandler { .setAuthPluginName(req.authMessage.getAuthPluginName()) .setPayload(ByteString.copyFrom(newam.getData())) .build(); - channel.write(new BookieProtocol.AuthResponse(req.getProtocolVersion(), + channel.writeAndFlush(new BookieProtocol.AuthResponse(req.getProtocolVersion(), message)); } } @@ -202,7 +198,7 @@ class AuthHandler { LOG.error("Error processing auth message, closing connection"); builder.setStatus(BookkeeperProtocol.StatusCode.EUA); - channel.write(builder.build()); + channel.writeAndFlush(builder.build()); channel.close(); return; } else { @@ -214,7 +210,7 @@ class AuthHandler { .build(); builder.setStatus(BookkeeperProtocol.StatusCode.EOK) .setAuthResponse(message); - channel.write(builder.build()); + channel.writeAndFlush(builder.build()); } } } @@ -232,12 +228,12 @@ class AuthHandler { } } - static class ClientSideHandler extends SimpleChannelHandler { + static class ClientSideHandler extends ChannelDuplexHandler { volatile boolean authenticated = false; final ClientAuthProvider.Factory authProviderFactory; ClientAuthProvider authProvider; final AtomicLong transactionIdGenerator; - final Queue<MessageEvent> waitingForAuth = new ConcurrentLinkedQueue<MessageEvent>(); + final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>(); final ClientConnectionPeer connectionPeer; ClientSideHandler(ClientAuthProvider.Factory authProviderFactory, @@ -250,36 +246,30 @@ class AuthHandler { } @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent e) - throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception { authProvider = authProviderFactory.newProvider(connectionPeer, new AuthHandshakeCompleteCallback(ctx)); authProvider.init(new AuthRequestCallback(ctx, authProviderFactory.getPluginName())); - super.channelConnected(ctx, e); + super.channelActive(ctx); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (authProvider != null) { authProvider.close(); } - super.channelClosed(ctx, e); + super.channelInactive(ctx); } @Override - public void messageReceived(ChannelHandlerContext ctx, - MessageEvent e) - throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { assert (authProvider != null); - Object event = e.getMessage(); - if (authenticated) { - super.messageReceived(ctx, e); - } else if (event instanceof BookkeeperProtocol.Response) { - BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response)event; + super.channelRead(ctx, msg); + } else if (msg instanceof BookkeeperProtocol.Response) { + BookkeeperProtocol.Response resp = (BookkeeperProtocol.Response) msg; if (resp.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH) { if (resp.getStatus() != BookkeeperProtocol.StatusCode.EOK) { authenticationError(ctx, resp.getStatus().getNumber()); @@ -287,7 +277,7 @@ class AuthHandler { assert (resp.hasAuthResponse()); BookkeeperProtocol.AuthMessage am = resp.getAuthResponse(); if (AuthProviderFactoryFactory.authenticationDisabledPluginName.equals(am.getAuthPluginName())){ - SocketAddress remote = ctx.getChannel().getRemoteAddress(); + SocketAddress remote = ctx.channel().remoteAddress(); LOG.info("Authentication is not enabled." + "Considering this client {0} authenticated", remote); AuthHandshakeCompleteCallback authHandshakeCompleteCallback @@ -307,28 +297,26 @@ class AuthHandler { } @Override - public void writeRequested(ChannelHandlerContext ctx, - MessageEvent e) - throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { synchronized (this) { if (authenticated) { - super.writeRequested(ctx, e); - } else if (e.getMessage() instanceof BookkeeperProtocol.Request) { + super.write(ctx, msg, promise); + } else if (msg instanceof BookkeeperProtocol.Request) { // let auth messages through, queue the rest - BookkeeperProtocol.Request req = (BookkeeperProtocol.Request)e.getMessage(); + BookkeeperProtocol.Request req = (BookkeeperProtocol.Request) msg; if (req.getHeader().getOperation() == BookkeeperProtocol.OperationType.AUTH) { - super.writeRequested(ctx, e); + super.write(ctx, msg, promise); } else { - waitingForAuth.add(e); + waitingForAuth.add(msg); } - } else if (e.getMessage() instanceof BookieProtocol.Request) { + } else if (msg instanceof BookieProtocol.Request) { // let auth messages through, queue the rest - BookieProtocol.Request req = (BookieProtocol.Request)e.getMessage(); + BookieProtocol.Request req = (BookieProtocol.Request)msg; if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) { - super.writeRequested(ctx, e); + super.write(ctx, msg, promise); } else { - waitingForAuth.add(e); + waitingForAuth.add(msg); } } // else just drop } @@ -340,9 +328,7 @@ class AuthHandler { void authenticationError(ChannelHandlerContext ctx, int errorCode) { LOG.error("Error processing auth message, erroring connection {}", errorCode); - ctx.sendUpstream(new DefaultExceptionEvent(ctx.getChannel(), - new AuthenticationException( - "Auth failed with error " + errorCode))); + ctx.fireExceptionCaught(new AuthenticationException("Auth failed with error " + errorCode)); } class AuthRequestCallback implements AuthCallbacks.GenericCallback<AuthToken> { @@ -351,7 +337,7 @@ class AuthHandler { String pluginName; AuthRequestCallback(ChannelHandlerContext ctx, String pluginName) { - this.channel = ctx.getChannel(); + this.channel = ctx.channel(); this.ctx = ctx; this.pluginName = pluginName; } @@ -377,7 +363,7 @@ class AuthHandler { .setHeader(header) .setAuthRequest(message); - channel.write(builder.build()); + channel.writeAndFlush(builder.build()); } } @@ -392,10 +378,10 @@ class AuthHandler { if (rc == BKException.Code.OK) { synchronized (this) { authenticated = true; - MessageEvent e = waitingForAuth.poll(); - while (e != null) { - ctx.sendDownstream(e); - e = waitingForAuth.poll(); + Object msg = waitingForAuth.poll(); + while (msg != null) { + ctx.writeAndFlush(msg); + msg = waitingForAuth.poll(); } } } else { @@ -406,6 +392,7 @@ class AuthHandler { } } + @SuppressWarnings("serial") static class AuthenticationException extends IOException { AuthenticationException(String reason) { super(reason);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index ce85aef..4fb08e4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -48,13 +48,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +55,14 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ExtensionRegistry; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + /** * Implements the client-side part of the BookKeeper protocol. * @@ -73,7 +74,7 @@ public class BookieClient implements PerChannelBookieClientFactory { AtomicLong totalBytesOutstanding = new AtomicLong(); OrderedSafeExecutor executor; - ClientSocketChannelFactory channelFactory; + EventLoopGroup eventLoopGroup; final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels = new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>(); final HashedWheelTimer requestTimer; @@ -89,15 +90,15 @@ public class BookieClient implements PerChannelBookieClientFactory { private final long bookieErrorThresholdPerInterval; - public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, + public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup, OrderedSafeExecutor executor) throws IOException { - this(conf, channelFactory, executor, NullStatsLogger.INSTANCE); + this(conf, eventLoopGroup, executor, NullStatsLogger.INSTANCE); } - public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, + public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup, OrderedSafeExecutor executor, StatsLogger statsLogger) throws IOException { this.conf = conf; - this.channelFactory = channelFactory; + this.eventLoopGroup = eventLoopGroup; this.executor = executor; this.closed = false; this.closeLock = new ReentrantReadWriteLock(); @@ -141,7 +142,7 @@ public class BookieClient implements PerChannelBookieClientFactory { @Override public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool) { - return new PerChannelBookieClient(conf, executor, channelFactory, address, requestTimer, statsLogger, + return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, requestTimer, statsLogger, authProviderFactory, registry, pcbcPool); } @@ -172,7 +173,7 @@ public class BookieClient implements PerChannelBookieClientFactory { } public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey, - final long lac, final ChannelBuffer toSend, final WriteLacCallback cb, final Object ctx) { + final long lac, final ByteBuf toSend, final WriteLacCallback cb, final Object ctx) { closeLock.readLock().lock(); try { final PerChannelBookieClientPool client = lookupClient(addr, lac); @@ -182,6 +183,7 @@ public class BookieClient implements PerChannelBookieClientFactory { return; } + toSend.retain(); client.obtain(new GenericCallback<PerChannelBookieClient>() { @Override public void operationComplete(final int rc, PerChannelBookieClient pcbc) { @@ -196,19 +198,20 @@ public class BookieClient implements PerChannelBookieClientFactory { } catch (RejectedExecutionException re) { cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); } - return; + } else { + pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); } - pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); + + toSend.release(); } - }); + }, ledgerId); } finally { closeLock.readLock().unlock(); } } public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey, - final long entryId, - final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) { + final long entryId, final ByteBuf toSend, final WriteCallback cb, final Object ctx, final int options) { closeLock.readLock().lock(); try { final PerChannelBookieClientPool client = lookupClient(addr, entryId); @@ -218,6 +221,10 @@ public class BookieClient implements PerChannelBookieClientFactory { return; } + // Retain the buffer, since the connection could be obtained after the PendingApp might have already + // failed + toSend.retain(); + client.obtain(new GenericCallback<PerChannelBookieClient>() { @Override public void operationComplete(final int rc, PerChannelBookieClient pcbc) { @@ -233,11 +240,12 @@ public class BookieClient implements PerChannelBookieClientFactory { cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx); } - return; + } else { + pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options); } - pcbc.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options); + toSend.release(); } - }); + }, ledgerId); } finally { closeLock.readLock().unlock(); } @@ -277,7 +285,7 @@ public class BookieClient implements PerChannelBookieClientFactory { } pcbc.readEntryAndFenceLedger(ledgerId, masterKey, entryId, cb, ctx); } - }); + }, ledgerId); } finally { closeLock.readLock().unlock(); } @@ -310,7 +318,7 @@ public class BookieClient implements PerChannelBookieClientFactory { } pcbc.readLac(ledgerId, cb, ctx); } - }); + }, ledgerId); } finally { closeLock.readLock().unlock(); } @@ -346,7 +354,7 @@ public class BookieClient implements PerChannelBookieClientFactory { } pcbc.readEntry(ledgerId, entryId, cb, ctx); } - }); + }, ledgerId); } finally { closeLock.readLock().unlock(); } @@ -379,7 +387,7 @@ public class BookieClient implements PerChannelBookieClientFactory { } pcbc.getBookieInfo(requested, cb, ctx); } - }); + }, requested); } finally { closeLock.readLock().unlock(); } @@ -458,26 +466,21 @@ public class BookieClient implements PerChannelBookieClientFactory { Counter counter = new Counter(); byte hello[] = "hello".getBytes(UTF_8); long ledger = Long.parseLong(args[2]); - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); - ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(tfb.setNameFormat( - "BookKeeper-NIOBoss-%d").build()), - Executors.newCachedThreadPool(tfb.setNameFormat( - "BookKeeper-NIOWorker-%d").build())); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() .name("BookieClientWorker") .numThreads(1) .build(); - BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor); + BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1])); for (int i = 0; i < 100000; i++) { counter.inc(); - bc.addEntry(addr, ledger, new byte[0], i, ChannelBuffers.wrappedBuffer(hello), cb, counter, 0); + bc.addEntry(addr, ledger, new byte[0], i, Unpooled.wrappedBuffer(hello), cb, counter, 0); } counter.wait(0); System.out.println("Total = " + counter.total()); - channelFactory.releaseExternalResources(); + eventLoopGroup.shutdownGracefully(); executor.shutdown(); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java index 2c6dd3a..cf7d419 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java @@ -21,36 +21,56 @@ package org.apache.bookkeeper.proto; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.auth.BookieAuthProvider; import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.commons.lang.SystemUtils; import org.apache.zookeeper.KeeperException; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.ChannelGroupFuture; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; -import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.protobuf.ExtensionRegistry; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; + import com.google.common.annotations.VisibleForTesting; import java.net.SocketAddress; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.List; import org.apache.bookkeeper.auth.BookKeeperPrincipal; import org.apache.bookkeeper.bookie.BookieConnectionPeer; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.bookkeeper.net.BookieSocketAddress; /** * Netty server for serving bookie requests @@ -61,12 +81,14 @@ class BookieNettyServer { final int maxFrameSize; final ServerConfiguration conf; - final List<ChannelManager> channels = new ArrayList<>(); + final EventLoopGroup eventLoopGroup; + final EventLoopGroup jvmEventLoopGroup; final RequestProcessor requestProcessor; - final ChannelGroup allChannels = new CleanupChannelGroup(); final AtomicBoolean isRunning = new AtomicBoolean(false); final Object suspensionLock = new Object(); - boolean suspended = false; + volatile boolean suspended = false; + ChannelGroup allChannels; + final BookieSocketAddress bookieAddress; final BookieAuthProvider.Factory authProviderFactory; final BookieProtoEncoding.ResponseEncoder responseEncoder; @@ -85,24 +107,42 @@ class BookieNettyServer { requestDecoder = new BookieProtoEncoding.RequestDecoder(registry); if (!conf.isDisableServerSocketBind()) { - channels.add(new NioServerSocketChannelManager()); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("bookie-io-%s").build(); + final int numThreads = Runtime.getRuntime().availableProcessors() * 2; + + EventLoopGroup eventLoopGroup; + if (SystemUtils.IS_OS_LINUX) { + try { + eventLoopGroup = new EpollEventLoopGroup(numThreads, threadFactory); + } catch (ExceptionInInitializerError | NoClassDefFoundError | UnsatisfiedLinkError e) { + LOG.warn("Could not use Netty Epoll event loop for bookie server: {}", e.getMessage()); + eventLoopGroup = new NioEventLoopGroup(numThreads, threadFactory); + } + } else { + eventLoopGroup = new NioEventLoopGroup(numThreads, threadFactory); + } + + this.eventLoopGroup = eventLoopGroup; + allChannels = new CleanupChannelGroup(eventLoopGroup); + } else { + this.eventLoopGroup = null; } + if (conf.isEnableLocalTransport()) { - channels.add(new VMLocalChannelManager()); + jvmEventLoopGroup = new DefaultEventLoopGroup(); + allChannels = new CleanupChannelGroup(jvmEventLoopGroup); + } else { + jvmEventLoopGroup = null; } - try { - for (ChannelManager channel : channels) { - Channel nettyChannel = channel.start(conf, new BookiePipelineFactory()); - allChannels.add(nettyChannel); - } - } catch (IOException bindError) { - // clean up all the channels, if this constructor throws an exception the caller code will - // not be able to call close(), leading to a resource leak - for (ChannelManager channel : channels) { - channel.close(); - } - throw bindError; + + bookieAddress = Bookie.getBookieAddress(conf); + InetSocketAddress bindAddress; + if (conf.getListeningInterface() == null) { + bindAddress = new InetSocketAddress(conf.getBookiePort()); + } else { + bindAddress = bookieAddress.getSocketAddress(); } + listenOn(bindAddress, bookieAddress); } boolean isRunning() { @@ -113,7 +153,19 @@ class BookieNettyServer { void suspendProcessing() { synchronized (suspensionLock) { suspended = true; - allChannels.setReadable(false).awaitUninterruptibly(); + for (Channel channel : allChannels) { + // To suspend processing in the bookie, submit a task + // that keeps the event loop busy until resume is + // explicitely invoked + channel.eventLoop().submit(() -> { + while (suspended && isRunning()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + }); + } } } @@ -121,11 +173,111 @@ class BookieNettyServer { void resumeProcessing() { synchronized (suspensionLock) { suspended = false; - allChannels.setReadable(true).awaitUninterruptibly(); + for (Channel channel : allChannels) { + channel.config().setAutoRead(true); + } suspensionLock.notifyAll(); } } + private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) throws InterruptedException { + if (!conf.isDisableServerSocketBind()) { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true)); + bootstrap.group(eventLoopGroup, eventLoopGroup); + bootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay()); + bootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger()); + bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, + new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(), + conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax())); + + if (eventLoopGroup instanceof EpollEventLoopGroup) { + bootstrap.channel(EpollServerSocketChannel.class); + } else { + bootstrap.channel(NioServerSocketChannel.class); + } + + bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + synchronized (suspensionLock) { + while (suspended) { + suspensionLock.wait(); + } + } + + BookieSideConnectionPeerContextHandler contextHandler = new BookieSideConnectionPeerContextHandler(); + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); + pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); + + pipeline.addLast("bookieProtoDecoder", requestDecoder); + pipeline.addLast("bookieProtoEncoder", responseEncoder); + pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(), authProviderFactory)); + + ChannelInboundHandler requestHandler = isRunning.get() + ? new BookieRequestHandler(conf, requestProcessor, allChannels) : new RejectRequestHandler(); + pipeline.addLast("bookieRequestHandler", requestHandler); + + } + }); + + // Bind and start to accept incoming connections + bootstrap.bind(address.getAddress(), address.getPort()).sync(); + } + + if (conf.isEnableLocalTransport()) { + ServerBootstrap jvmBootstrap = new ServerBootstrap(); + jvmBootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true)); + jvmBootstrap.group(jvmEventLoopGroup, jvmEventLoopGroup); + jvmBootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay()); + jvmBootstrap.childOption(ChannelOption.SO_KEEPALIVE, conf.getServerSockKeepalive()); + jvmBootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger()); + jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, + new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(), + conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax())); + + if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) { + jvmBootstrap.channel(LocalServerChannel.class); + } else if (jvmEventLoopGroup instanceof EpollEventLoopGroup) { + jvmBootstrap.channel(EpollServerSocketChannel.class); + } else { + jvmBootstrap.channel(NioServerSocketChannel.class); + } + + jvmBootstrap.childHandler(new ChannelInitializer<LocalChannel>() { + @Override + protected void initChannel(LocalChannel ch) throws Exception { + synchronized (suspensionLock) { + while (suspended) { + suspensionLock.wait(); + } + } + + BookieSideConnectionPeerContextHandler contextHandler = new BookieSideConnectionPeerContextHandler(); + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); + pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); + + pipeline.addLast("bookieProtoDecoder", requestDecoder); + pipeline.addLast("bookieProtoEncoder", responseEncoder); + pipeline.addLast("bookieAuthHandler", new AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(), authProviderFactory)); + + ChannelInboundHandler requestHandler = isRunning.get() + ? new BookieRequestHandler(conf, requestProcessor, allChannels) : new RejectRequestHandler(); + pipeline.addLast("bookieRequestHandler", requestHandler); + + } + }); + + // use the same address 'name', so clients can find local Bookie still discovering them using ZK + jvmBootstrap.bind(bookieAddress.getLocalAddress()).sync(); + LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress); + } + } + void start() { isRunning.set(true); } @@ -134,13 +286,23 @@ class BookieNettyServer { LOG.info("Shutting down BookieNettyServer"); isRunning.set(false); allChannels.close().awaitUninterruptibly(); - for (ChannelManager channel : channels) { - channel.close(); + + if (eventLoopGroup != null) { + try { + eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.MILLISECONDS).await(); + } catch (InterruptedException e) { + /// OK + } + } + if (jvmEventLoopGroup != null) { + LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress); + jvmEventLoopGroup.shutdownGracefully(); } + authProviderFactory.close(); } - class BookieSideConnectionPeerContextHandler extends SimpleChannelHandler { + class BookieSideConnectionPeerContextHandler extends ChannelInboundHandlerAdapter { final BookieConnectionPeer connectionPeer; volatile Channel channel; @@ -152,7 +314,7 @@ class BookieNettyServer { public SocketAddress getRemoteAddr() { Channel c = channel; if (c != null) { - return c.getRemoteAddress(); + return c.remoteAddress(); } else { return null; } @@ -191,56 +353,25 @@ class BookieNettyServer { } @Override - public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - channel = ctx.getChannel(); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel = ctx.channel(); } } - class BookiePipelineFactory implements ChannelPipelineFactory { - - public ChannelPipeline getPipeline() throws Exception { - synchronized (suspensionLock) { - while (suspended) { - suspensionLock.wait(); - } - } - BookieSideConnectionPeerContextHandler contextHandler = new BookieSideConnectionPeerContextHandler(); - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("lengthbaseddecoder", - new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); - pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); - - pipeline.addLast("bookieProtoDecoder", requestDecoder); - pipeline.addLast("bookieProtoEncoder", responseEncoder); - pipeline.addLast("bookieAuthHandler", - new AuthHandler.ServerSideHandler(contextHandler.getConnectionPeer(), authProviderFactory)); - - SimpleChannelHandler requestHandler = isRunning.get() - ? new BookieRequestHandler(conf, requestProcessor, allChannels) - : new RejectRequestHandler(); - - pipeline.addLast("bookieRequestHandler", requestHandler); - pipeline.addLast("contextHandler", contextHandler); - return pipeline; - } - } - - private static class RejectRequestHandler extends SimpleChannelHandler { - + private static class RejectRequestHandler extends ChannelInboundHandlerAdapter { @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ctx.getChannel().close(); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.channel().close(); } - } private static class CleanupChannelGroup extends DefaultChannelGroup { private AtomicBoolean closed = new AtomicBoolean(false); - CleanupChannelGroup() { - super("BookieChannelGroup"); + public CleanupChannelGroup(EventLoopGroup eventLoopGroup) { + super("BookieChannelGroup", eventLoopGroup.next()); } @Override http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 683a6fb..148b31d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -20,23 +20,30 @@ */ package org.apache.bookkeeper.proto; -import com.google.protobuf.ByteString; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.InvalidProtocolBufferException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.MessageToMessageEncoder; + +import java.io.IOException; +import java.util.List; -import org.jboss.netty.buffer.ChannelBufferFactory; -import org.jboss.netty.buffer.ChannelBufferInputStream; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; -import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; +import org.apache.bookkeeper.util.DoubleByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.ExtensionRegistry; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.MessageLite; + public class BookieProtoEncoding { private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class); @@ -50,7 +57,7 @@ public class BookieProtoEncoding { * @return encode buffer. * @throws Exception */ - public Object encode(Object object, ChannelBufferFactory factory) throws Exception; + public Object encode(Object object, ByteBufAllocator allocator) throws Exception; /** * Decode a <i>packet</i> into an object. @@ -60,7 +67,7 @@ public class BookieProtoEncoding { * @return parsed object. * @throws Exception */ - public Object decode(ChannelBuffer packet) throws Exception; + public Object decode(ByteBuf packet) throws Exception; } @@ -72,7 +79,7 @@ public class BookieProtoEncoding { } @Override - public Object encode(Object msg, ChannelBufferFactory bufferFactory) + public Object encode(Object msg, ByteBufAllocator allocator) throws Exception { if (!(msg instanceof BookieProtocol.Request)) { return msg; @@ -82,10 +89,10 @@ public class BookieProtoEncoding { BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)r; int totalHeaderSize = 4 // for the header + BookieProtocol.MASTER_KEY_LENGTH; // for the master key - ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize); + ByteBuf buf = allocator.buffer(totalHeaderSize); buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt()); buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); - return ChannelBuffers.wrappedBuffer(buf, ar.getData()); + return DoubleByteBuf.get(buf, ar.getData()); } else if (r instanceof BookieProtocol.ReadRequest) { int totalHeaderSize = 4 // for request type + 8 // for ledgerId @@ -94,7 +101,7 @@ public class BookieProtoEncoding { totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH; } - ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize); + ByteBuf buf = allocator.buffer(totalHeaderSize); buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt()); buf.writeLong(r.getLedgerId()); buf.writeLong(r.getEntryId()); @@ -107,11 +114,11 @@ public class BookieProtoEncoding { BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest)r).getAuthMessage(); int totalHeaderSize = 4; // for request type int totalSize = totalHeaderSize + am.getSerializedSize(); - ChannelBuffer buf = bufferFactory.getBuffer(totalSize); + ByteBuf buf = allocator.buffer(totalSize); buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt()); - ChannelBufferOutputStream bufStream = new ChannelBufferOutputStream(buf); + ByteBufOutputStream bufStream = new ByteBufOutputStream(buf); am.writeTo(bufStream); return buf; } else { @@ -120,7 +127,7 @@ public class BookieProtoEncoding { } @Override - public Object decode(ChannelBuffer packet) + public Object decode(ByteBuf packet) throws Exception { PacketHeader h = PacketHeader.fromInt(packet.readInt()); @@ -138,12 +145,12 @@ public class BookieProtoEncoding { masterKey = new byte[BookieProtocol.MASTER_KEY_LENGTH]; packet.readBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH); - ChannelBuffer bb = packet.duplicate(); - - ledgerId = bb.readLong(); - entryId = bb.readLong(); - return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId, - flags, masterKey, packet.slice()); + // Read ledger and entry id without advancing the reader index + packet.markReaderIndex(); + ledgerId = packet.readLong(); + entryId = packet.readLong(); + packet.resetReaderIndex(); + return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId, flags, masterKey, packet.retain()); case BookieProtocol.READENTRY: ledgerId = packet.readLong(); entryId = packet.readLong(); @@ -159,7 +166,7 @@ public class BookieProtoEncoding { case BookieProtocol.AUTH: BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder(); - builder.mergeFrom(new ChannelBufferInputStream(packet), extensionRegistry); + builder.mergeFrom(new ByteBufInputStream(packet), extensionRegistry); return new BookieProtocol.AuthRequest(h.getVersion(), builder.build()); } return packet; @@ -174,13 +181,13 @@ public class BookieProtoEncoding { } @Override - public Object encode(Object msg, ChannelBufferFactory bufferFactory) + public Object encode(Object msg, ByteBufAllocator allocator) throws Exception { if (!(msg instanceof BookieProtocol.Response)) { return msg; } BookieProtocol.Response r = (BookieProtocol.Response)msg; - ChannelBuffer buf = bufferFactory.getBuffer(24); + ByteBuf buf = allocator.buffer(24); buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), (short)0).toInt()); @@ -192,8 +199,7 @@ public class BookieProtoEncoding { BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r; if (rr.hasData()) { - return ChannelBuffers.wrappedBuffer(buf, - ChannelBuffers.wrappedBuffer(rr.getData())); + return DoubleByteBuf.get(buf, rr.getData()); } else { return buf; } @@ -205,15 +211,14 @@ public class BookieProtoEncoding { return buf; } else if (msg instanceof BookieProtocol.AuthResponse) { BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage(); - return ChannelBuffers.wrappedBuffer(buf, - ChannelBuffers.wrappedBuffer(am.toByteArray())); + return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray())); } else { LOG.error("Cannot encode unknown response type {}", msg.getClass().getName()); return msg; } } @Override - public Object decode(ChannelBuffer buffer) + public Object decode(ByteBuf buffer) throws Exception { int rc; long ledgerId, entryId; @@ -240,7 +245,7 @@ public class BookieProtoEncoding { ledgerId, entryId); } case BookieProtocol.AUTH: - ChannelBufferInputStream bufStream = new ChannelBufferInputStream(buffer); + ByteBufInputStream bufStream = new ByteBufInputStream(buffer); BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder(); builder.mergeFrom(bufStream, extensionRegistry); @@ -260,15 +265,14 @@ public class BookieProtoEncoding { } @Override - public Object decode(ChannelBuffer packet) throws Exception { - return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet), - extensionRegistry); + public Object decode(ByteBuf packet) throws Exception { + return BookkeeperProtocol.Request.parseFrom(new ByteBufInputStream(packet), extensionRegistry); } @Override - public Object encode(Object msg, ChannelBufferFactory factory) throws Exception { + public Object encode(Object msg, ByteBufAllocator allocator) throws Exception { BookkeeperProtocol.Request request = (BookkeeperProtocol.Request) msg; - return ChannelBuffers.wrappedBuffer(request.toByteArray()); + return serializeProtobuf(request, allocator); } } @@ -281,20 +285,37 @@ public class BookieProtoEncoding { } @Override - public Object decode(ChannelBuffer packet) throws Exception { - return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet), + public Object decode(ByteBuf packet) throws Exception { + return BookkeeperProtocol.Response.parseFrom(new ByteBufInputStream(packet), extensionRegistry); } @Override - public Object encode(Object msg, ChannelBufferFactory factory) throws Exception { + public Object encode(Object msg, ByteBufAllocator allocator) throws Exception { BookkeeperProtocol.Response response = (BookkeeperProtocol.Response) msg; - return ChannelBuffers.wrappedBuffer(response.toByteArray()); + return serializeProtobuf(response, allocator); + } + + } + + private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator allocator) { + int size = msg.getSerializedSize(); + ByteBuf buf = allocator.heapBuffer(size, size); + + try { + msg.writeTo(CodedOutputStream.newInstance(buf.array(), buf.arrayOffset() + buf.writerIndex(), size)); + } catch (IOException e) { + // This is in-memory serialization, should not fail + throw new RuntimeException(e); } + // Advance writer idx + buf.writerIndex(buf.capacity()); + return buf; } - public static class RequestEncoder extends OneToOneEncoder { + @Sharable + public static class RequestEncoder extends MessageToMessageEncoder<Object> { final EnDecoder REQ_PREV3; final EnDecoder REQ_V3; @@ -305,23 +326,23 @@ public class BookieProtoEncoding { } @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) - throws Exception { + protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("Encode request {} to channel {}.", msg, channel); + LOG.debug("Encode request {} to channel {}.", msg, ctx.channel()); } if (msg instanceof BookkeeperProtocol.Request) { - return REQ_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory()); + out.add(REQ_V3.encode(msg, ctx.alloc())); } else if (msg instanceof BookieProtocol.Request) { - return REQ_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory()); + out.add(REQ_PREV3.encode(msg, ctx.alloc())); } else { - LOG.error("Invalid request to encode to {}: {}", channel, msg.getClass().getName()); - return msg; + LOG.error("Invalid request to encode to {}: {}", ctx.channel(), msg.getClass().getName()); + out.add(msg); } } } - public static class RequestDecoder extends OneToOneDecoder { + @Sharable + public static class RequestDecoder extends MessageToMessageDecoder<Object> { final EnDecoder REQ_PREV3; final EnDecoder REQ_V3; @@ -331,31 +352,32 @@ public class BookieProtoEncoding { } @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) - throws Exception { + protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("Received request {} from channel {} to decode.", msg, channel); + LOG.debug("Received request {} from channel {} to decode.", msg, ctx.channel()); } - if (!(msg instanceof ChannelBuffer)) { - return msg; + if (!(msg instanceof ByteBuf)) { + out.add(msg); + return; } - ChannelBuffer buffer = (ChannelBuffer) msg; + ByteBuf buffer = (ByteBuf) msg; try { buffer.markReaderIndex(); try { - return REQ_V3.decode(buffer); + out.add(REQ_V3.decode(buffer)); } catch (InvalidProtocolBufferException e) { buffer.resetReaderIndex(); - return REQ_PREV3.decode(buffer); + out.add(REQ_PREV3.decode(buffer)); } } catch (Exception e) { - LOG.error("Failed to decode a request from {} : ", channel, e); - throw e; + LOG.error("Failed to decode a request from {} : ", ctx.channel(), e); + ctx.close(); } } } - public static class ResponseEncoder extends OneToOneEncoder { + @Sharable + public static class ResponseEncoder extends MessageToMessageEncoder<Object> { final EnDecoder REP_PREV3; final EnDecoder REP_V3; @@ -365,23 +387,24 @@ public class BookieProtoEncoding { } @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) + protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("Encode response {} to channel {}.", msg, channel); + LOG.debug("Encode response {} to channel {}.", msg, ctx.channel()); } if (msg instanceof BookkeeperProtocol.Response) { - return REP_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory()); + out.add(REP_V3.encode(msg, ctx.alloc())); } else if (msg instanceof BookieProtocol.Response) { - return REP_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory()); + out.add(REP_PREV3.encode(msg, ctx.alloc())); } else { - LOG.error("Invalid response to encode to {}: {}", channel, msg.getClass().getName()); - return msg; + LOG.error("Invalid response to encode to {}: {}", ctx.channel(), msg.getClass().getName()); + out.add(msg); } } } - public static class ResponseDecoder extends OneToOneDecoder { + @Sharable + public static class ResponseDecoder extends MessageToMessageDecoder<Object> { final EnDecoder REP_PREV3; final EnDecoder REP_V3; @@ -391,26 +414,25 @@ public class BookieProtoEncoding { } @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) - throws Exception { + protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("Received response {} from channel {} to decode.", msg, channel); + LOG.debug("Received response {} from channel {} to decode.", msg, ctx.channel()); } - if (!(msg instanceof ChannelBuffer)) { - return msg; + if (!(msg instanceof ByteBuf)) { + out.add(msg); } - ChannelBuffer buffer = (ChannelBuffer) msg; + ByteBuf buffer = (ByteBuf) msg; try { buffer.markReaderIndex(); try { - return REP_V3.decode(buffer); + out.add(REP_V3.decode(buffer)); } catch (InvalidProtocolBufferException e) { buffer.resetReaderIndex(); - return REP_PREV3.decode(buffer); + out.add(REP_PREV3.decode(buffer)); } } catch (Exception e) { - LOG.error("Failed to decode a response from channel {} : ", channel, e); - throw e; + LOG.error("Failed to decode a response from channel {} : ", ctx.channel(), e); + ctx.close(); } } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 1191d3c..094daab 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -21,7 +21,8 @@ package org.apache.bookkeeper.proto; * */ -import org.jboss.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; + import java.nio.ByteBuffer; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; @@ -249,25 +250,29 @@ public interface BookieProtocol { } static class AddRequest extends Request { - final ChannelBuffer data; + final ByteBuf data; AddRequest(byte protocolVersion, long ledgerId, long entryId, - short flags, byte[] masterKey, ChannelBuffer data) { + short flags, byte[] masterKey, ByteBuf data) { super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, masterKey); - this.data = data; + this.data = data.retain(); } - ChannelBuffer getData() { + ByteBuf getData() { return data; } ByteBuffer getDataAsByteBuffer() { - return data.toByteBuffer().slice(); + return data.nioBuffer().slice(); } boolean isRecoveryAdd() { return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD; } + + void release() { + data.release(); + } } static class ReadRequest extends Request { @@ -342,14 +347,14 @@ public interface BookieProtocol { } static class ReadResponse extends Response { - final ChannelBuffer data; + final ByteBuf data; ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { super(protocolVersion, READENTRY, errorCode, ledgerId, entryId); this.data = null; } - ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ChannelBuffer data) { + ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) { super(protocolVersion, READENTRY, errorCode, ledgerId, entryId); this.data = data; } @@ -358,7 +363,7 @@ public interface BookieProtocol { return data != null; } - ChannelBuffer getData() { + ByteBuf getData() { return data; } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 3c5f128..b1bc081 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -20,23 +20,22 @@ */ package org.apache.bookkeeper.proto; + +import java.nio.channels.ClosedChannelException; + import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.channel.group.ChannelGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.channels.ClosedChannelException; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.group.ChannelGroup; /** * Serverside handler for bookkeeper requests */ -class BookieRequestHandler extends SimpleChannelHandler { +class BookieRequestHandler extends ChannelInboundHandlerAdapter { private final static Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class); private final RequestProcessor requestProcessor; @@ -48,42 +47,36 @@ class BookieRequestHandler extends SimpleChannelHandler { } @Override - public void channelOpen(ChannelHandlerContext ctx, - ChannelStateEvent e) - throws Exception { - allChannels.add(ctx.getChannel()); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + LOG.info("Channel connected: {}", ctx.channel()); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - Throwable throwable = e.getCause(); - if (throwable instanceof ClosedChannelException) { - LOG.debug("Client died before request could be completed", throwable); - return; - } - LOG.error("Unhandled exception occurred in I/O thread or handler", throwable); + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + allChannels.add(ctx.channel()); } @Override - public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - LOG.debug("Channel connected {}", e); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + LOG.info("Channels disconnected: {}", ctx.channel()); } @Override - public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - LOG.debug("Channel disconnected {}", e); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (cause instanceof ClosedChannelException) { + LOG.info("Client died before request could be completed", cause); + return; + } + LOG.error("Unhandled exception occurred in I/O thread or handler", cause); + ctx.close(); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - Object event = e.getMessage(); - if (!(event instanceof BookkeeperProtocol.Request || event instanceof BookieProtocol.Request)) { - ctx.sendUpstream(e); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (!(msg instanceof BookkeeperProtocol.Request || msg instanceof BookieProtocol.Request)) { + ctx.fireChannelRead(msg); return; } - requestProcessor.processRequest(event, ctx.getChannel()); + requestProcessor.processRequest(msg, ctx.channel()); } - } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 38f40f8..0746686 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -20,10 +20,10 @@ */ package org.apache.bookkeeper.proto; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; + +import io.netty.channel.Channel; + import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.auth.AuthToken; @@ -33,7 +33,6 @@ import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,7 +131,7 @@ public class BookieRequestProcessor implements RequestProcessor { processReadRequestV3(r, c); break; case AUTH: - LOG.info("Ignoring auth operation from client {}",c.getRemoteAddress()); + LOG.info("Ignoring auth operation from client {}",c.remoteAddress()); BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage .newBuilder() .setAuthPluginName(AuthProviderFactoryFactory.authenticationDisabledPluginName) @@ -142,7 +141,7 @@ public class BookieRequestProcessor implements RequestProcessor { BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader()) .setStatus(BookkeeperProtocol.StatusCode.EOK) .setAuthResponse(message); - c.write(authResponse.build()); + c.writeAndFlush(authResponse.build()); break; case WRITE_LAC: processWriteLacRequestV3(r,c); @@ -158,7 +157,7 @@ public class BookieRequestProcessor implements RequestProcessor { BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader()) .setStatus(BookkeeperProtocol.StatusCode.EBADREQ); - c.write(response.build()); + c.writeAndFlush(response.build()); if (statsEnabled) { bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps(); } @@ -176,7 +175,7 @@ public class BookieRequestProcessor implements RequestProcessor { break; default: LOG.error("Unknown op type {}, sending error", r.getOpCode()); - c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r)); + c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r)); if (statsEnabled) { bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java index b2d6d82..58fd451 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java @@ -21,6 +21,8 @@ package org.apache.bookkeeper.proto; +import io.netty.buffer.ByteBuf; + import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -28,7 +30,6 @@ import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.zookeeper.AsyncCallback; -import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +68,7 @@ public class BookkeeperInternalCallbacks { } public interface ReadLacCallback { - void readLacComplete(int rc, long ledgerId, ChannelBuffer lac, ChannelBuffer buffer, Object ctx); + void readLacComplete(int rc, long ledgerId, ByteBuf lac, ByteBuf buffer, Object ctx); } public interface WriteLacCallback { @@ -86,7 +87,7 @@ public class BookkeeperInternalCallbacks { */ public interface ReadEntryCallback { - void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx); + void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx); } public interface GetBookieInfoCallback { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java deleted file mode 100644 index 774086c..0000000 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ChannelManager.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bookkeeper.proto; - -import java.io.IOException; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelPipelineFactory; - -/** - * Manages the lifycycle of a communication Channel - * @author enrico.olivelli - */ -public abstract class ChannelManager { - - /** - * Boots the Channel - * @param conf Bookie Configuration - * @param channelPipelineFactory Netty Pipeline Factory - * @param bookieAddress The actual address to listen on - * @return the channel which is listening for incoming connections - * @throws IOException - */ - public abstract Channel start(ServerConfiguration conf, ChannelPipelineFactory channelPipelineFactory) throws IOException; - - /** - * Releases all resources - */ - public abstract void close(); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java index 2658634..36a26ef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; @@ -71,12 +72,12 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool, } @Override - public void obtain(GenericCallback<PerChannelBookieClient> callback) { + public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) { if (1 == clients.length) { clients[0].connectIfNeededAndDoOp(callback); return; } - int idx = MathUtils.signSafeMod(counter.getAndIncrement(), clients.length); + int idx = MathUtils.signSafeMod(key, clients.length); clients[idx].connectIfNeededAndDoOp(callback); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java index 88c5eb1..1872701 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java @@ -28,10 +28,11 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.Channel; + public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements Runnable { private final static Logger LOG = LoggerFactory.getLogger(GetBookieInfoProcessorV3.class); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java deleted file mode 100644 index 91fde2c..0000000 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NioServerSocketChannelManager.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bookkeeper.proto; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; -import org.apache.bookkeeper.bookie.Bookie; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; - -/** - * Manages a NioServerSocketChannel channel - * - * @author enrico.olivelli - */ -public class NioServerSocketChannelManager extends ChannelManager { - - private ChannelFactory channelFactory; - - @Override - public Channel start(ServerConfiguration conf, ChannelPipelineFactory bookiePipelineFactory) throws IOException { - BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf); - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); - String base = "bookie-" + conf.getBookiePort() + "-netty"; - this.channelFactory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(tfb.setNameFormat(base + "-boss-%d").build()), - Executors.newCachedThreadPool(tfb.setNameFormat(base + "-worker-%d").build())); - - ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); - bootstrap.setPipelineFactory(bookiePipelineFactory); - bootstrap.setOption("child.tcpNoDelay", conf.getServerTcpNoDelay()); - bootstrap.setOption("child.soLinger", 2); - - InetSocketAddress bindAddress; - if (conf.getListeningInterface() == null) { - // listen on all interfaces - bindAddress = new InetSocketAddress(conf.getBookiePort()); - } else { - bindAddress = bookieAddress.getSocketAddress(); - } - - Channel listen = bootstrap.bind(bindAddress); - return listen; - } - - @Override - public void close() { - if (channelFactory != null) { - channelFactory.releaseExternalResources(); - } - channelFactory = null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index 681f6c6..4f14dcf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -23,10 +23,11 @@ import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.Channel; + abstract class PacketProcessorBase extends SafeRunnable { private final static Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class); final Request request; @@ -55,7 +56,7 @@ abstract class PacketProcessorBase extends SafeRunnable { } protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) { - channel.write(response); + channel.writeAndFlush(response, channel.voidPromise()); if (BookieProtocol.EOK == rc) { statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); } else { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java index 85ec6cb..873ef30 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.proto; +import io.netty.channel.Channel; + import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; @@ -29,7 +31,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.channel.Channel; public abstract class PacketProcessorBaseV3 extends SafeRunnable { @@ -47,7 +48,7 @@ public abstract class PacketProcessorBaseV3 extends SafeRunnable { } protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) { - channel.write(response); + channel.writeAndFlush(response); if (StatusCode.EOK == code) { statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); } else {
