BOOKKEEPER-1008: Netty 4.1 Added more ref-count fixes from yahoo-4.3 branch on top of #116
Author: Matteo Merli <[email protected]> Author: Matteo Merli <[email protected]> Author: Kishore Kasi Udayashankar <[email protected]> Reviewers: Jia Zhai <[email protected]>, Sijie Guo <[email protected]> Closes #138 from merlimat/netty-4.1 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/74f79513 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/74f79513 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/74f79513 Branch: refs/heads/master Commit: 74f795136c1fff3badb29fc982d0cc2d43096b45 Parents: 811ece5 Author: Matteo Merli <[email protected]> Authored: Mon May 15 10:52:53 2017 -0700 Committer: Sijie Guo <[email protected]> Committed: Mon May 15 10:52:53 2017 -0700 ---------------------------------------------------------------------- bookkeeper-benchmark/pom.xml | 20 +- .../bookkeeper/benchmark/BenchBookie.java | 38 +- bookkeeper-server/pom.xml | 46 +- .../LocalBookieEnsemblePlacementPolicy.java | 5 +- .../apache/bookkeeper/client/BookKeeper.java | 85 ++-- .../bookkeeper/client/CRC32DigestManager.java | 14 +- .../client/DefaultEnsemblePlacementPolicy.java | 3 +- .../apache/bookkeeper/client/DigestManager.java | 116 +++-- .../client/EnsemblePlacementPolicy.java | 3 +- .../client/ExplicitLacFlushPolicy.java | 6 +- .../apache/bookkeeper/client/LedgerChecker.java | 7 +- .../apache/bookkeeper/client/LedgerEntry.java | 5 +- .../client/LedgerFragmentReplicator.java | 6 +- .../apache/bookkeeper/client/LedgerHandle.java | 31 +- .../bookkeeper/client/LedgerHandleAdv.java | 6 +- .../bookkeeper/client/MacDigestManager.java | 10 +- .../apache/bookkeeper/client/PendingAddOp.java | 34 +- .../bookkeeper/client/PendingReadLacOp.java | 9 +- .../apache/bookkeeper/client/PendingReadOp.java | 11 +- .../bookkeeper/client/PendingWriteLacOp.java | 7 +- .../RackawareEnsemblePlacementPolicy.java | 3 +- .../RackawareEnsemblePlacementPolicyImpl.java | 3 +- .../bookkeeper/client/ReadLastConfirmedOp.java | 8 +- .../RegionAwareEnsemblePlacementPolicy.java | 2 +- .../client/TryReadLastConfirmedOp.java | 5 +- .../bookkeeper/conf/ClientConfiguration.java | 24 + .../bookkeeper/conf/ServerConfiguration.java | 113 ++++- .../bookkeeper/net/BookieSocketAddress.java | 3 +- .../net/StabilizeNetworkTopology.java | 7 +- .../bookkeeper/processor/RequestProcessor.java | 2 +- .../apache/bookkeeper/proto/AuthHandler.java | 141 +++--- .../apache/bookkeeper/proto/BookieClient.java | 73 +-- .../bookkeeper/proto/BookieNettyServer.java | 281 ++++++++--- .../bookkeeper/proto/BookieProtoEncoding.java | 186 ++++---- .../apache/bookkeeper/proto/BookieProtocol.java | 23 +- .../bookkeeper/proto/BookieRequestHandler.java | 55 +-- .../proto/BookieRequestProcessor.java | 15 +- .../proto/BookkeeperInternalCallbacks.java | 7 +- .../apache/bookkeeper/proto/ChannelManager.java | 46 -- .../DefaultPerChannelBookieClientPool.java | 5 +- .../proto/GetBookieInfoProcessorV3.java | 3 +- .../proto/NioServerSocketChannelManager.java | 75 --- .../bookkeeper/proto/PacketProcessorBase.java | 5 +- .../bookkeeper/proto/PacketProcessorBaseV3.java | 5 +- .../proto/PerChannelBookieClient.java | 359 +++++++------- .../proto/PerChannelBookieClientPool.java | 2 +- .../bookkeeper/proto/ReadEntryProcessor.java | 5 +- .../bookkeeper/proto/ReadEntryProcessorV3.java | 9 +- .../bookkeeper/proto/ReadLacProcessorV3.java | 5 +- .../bookkeeper/proto/ResponseBuilder.java | 5 +- .../bookkeeper/proto/VMLocalChannelManager.java | 63 --- .../bookkeeper/proto/WriteEntryProcessor.java | 6 +- .../bookkeeper/proto/WriteEntryProcessorV3.java | 3 +- .../bookkeeper/proto/WriteLacProcessorV3.java | 5 +- .../apache/bookkeeper/util/DoubleByteBuf.java | 468 +++++++++++++++++++ .../bookie/BookieInitializationTest.java | 11 +- .../bookkeeper/bookie/BookieJournalTest.java | 48 +- .../apache/bookkeeper/bookie/UpgradeTest.java | 10 +- .../bookkeeper/client/BookKeeperTest.java | 2 +- .../bookkeeper/client/BookKeeperTestClient.java | 13 - .../bookkeeper/client/BookieRecoveryTest.java | 5 +- .../apache/bookkeeper/client/ClientUtil.java | 6 +- .../bookkeeper/client/SlowBookieTest.java | 1 + .../client/TestGetBookieInfoTimeout.java | 18 +- .../TestRackawareEnsemblePlacementPolicy.java | 4 +- ...awareEnsemblePlacementPolicyUsingScript.java | 3 +- .../TestRegionAwareEnsemblePlacementPolicy.java | 4 +- .../bookkeeper/proto/NetworkLessBookieTest.java | 8 +- .../proto/TestBackwardCompatCMS42.java | 63 +-- .../proto/TestPerChannelBookieClient.java | 51 +- .../test/BookKeeperClusterTestCase.java | 1 + .../bookkeeper/test/BookieClientTest.java | 50 +- .../bookkeeper/test/BookieReadWriteTest.java | 2 +- .../apache/bookkeeper/test/LoopbackClient.java | 19 +- .../bookkeeper/util/DoubleByteBufTest.java | 121 +++++ pom.xml | 2 +- 76 files changed, 1865 insertions(+), 1059 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-benchmark/pom.xml ---------------------------------------------------------------------- diff --git a/bookkeeper-benchmark/pom.xml b/bookkeeper-benchmark/pom.xml index dac28b0..4354656 100644 --- a/bookkeeper-benchmark/pom.xml +++ b/bookkeeper-benchmark/pom.xml @@ -87,10 +87,28 @@ <version>${zookeeper.version}</version> <type>test-jar</type> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>io.netty</groupId> - <artifactId>netty</artifactId> + <artifactId>netty-all</artifactId> <version>${netty.version}</version> <scope>compile</scope> </dependency> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java ---------------------------------------------------------------------- diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java index 94ffd02..7ccd034 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java @@ -20,7 +20,6 @@ package org.apache.bookkeeper.benchmark; import java.io.IOException; -import java.util.concurrent.Executors; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; @@ -32,21 +31,23 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.SystemUtils; import org.apache.commons.cli.ParseException; import org.apache.zookeeper.KeeperException; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + public class BenchBookie { static final Logger LOG = LoggerFactory.getLogger(BenchBookie.class); @@ -137,18 +138,25 @@ public class BenchBookie { int size = Integer.parseInt(cmd.getOptionValue("size", "1024")); String servers = cmd.getOptionValue("zookeeper", "localhost:2181"); + EventLoopGroup eventLoop; + if (SystemUtils.IS_OS_LINUX) { + try { + eventLoop = new EpollEventLoopGroup(); + } catch (Throwable t) { + LOG.warn("Could not use Netty Epoll event loop for benchmark {}", t.getMessage()); + eventLoop = new NioEventLoopGroup(); + } + } else { + eventLoop = new NioEventLoopGroup(); + } - - ClientSocketChannelFactory channelFactory - = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors - .newCachedThreadPool()); OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() .name("BenchBookieClientScheduler") .numThreads(1) .build(); ClientConfiguration conf = new ClientConfiguration(); - BookieClient bc = new BookieClient(conf, channelFactory, executor); + BookieClient bc = new BookieClient(conf, eventLoop, executor); LatencyCallback lc = new LatencyCallback(); ThroughputCallback tc = new ThroughputCallback(); @@ -156,7 +164,7 @@ public class BenchBookie { long ledger = getValidLedgerId(servers); for(long entry = 0; entry < warmUpCount; entry++) { - ChannelBuffer toSend = ChannelBuffers.buffer(size); + ByteBuf toSend = Unpooled.buffer(size); toSend.resetReaderIndex(); toSend.resetWriterIndex(); toSend.writeLong(ledger); @@ -173,7 +181,7 @@ public class BenchBookie { int entryCount = 5000; long startTime = System.nanoTime(); for(long entry = 0; entry < entryCount; entry++) { - ChannelBuffer toSend = ChannelBuffers.buffer(size); + ByteBuf toSend = Unpooled.buffer(size); toSend.resetReaderIndex(); toSend.resetWriterIndex(); toSend.writeLong(ledger); @@ -194,7 +202,7 @@ public class BenchBookie { startTime = System.currentTimeMillis(); tc = new ThroughputCallback(); for(long entry = 0; entry < entryCount; entry++) { - ChannelBuffer toSend = ChannelBuffers.buffer(size); + ByteBuf toSend = Unpooled.buffer(size); toSend.resetReaderIndex(); toSend.resetWriterIndex(); toSend.writeLong(ledger); @@ -208,7 +216,7 @@ public class BenchBookie { LOG.info("Throughput: " + ((long)entryCount)*1000/(endTime-startTime)); bc.close(); - channelFactory.releaseExternalResources(); + eventLoop.shutdownGracefully(); executor.shutdown(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/pom.xml ---------------------------------------------------------------------- diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml index be4249f..99705f4 100644 --- a/bookkeeper-server/pom.xml +++ b/bookkeeper-server/pom.xml @@ -23,7 +23,6 @@ <groupId>org.apache.bookkeeper</groupId> <version>4.5.0-SNAPSHOT</version> </parent> - <groupId>org.apache.bookkeeper</groupId> <artifactId>bookkeeper-server</artifactId> <name>bookkeeper-server</name> <url>http://maven.apache.org</url> @@ -74,6 +73,22 @@ <groupId>net.java.dev.javacc</groupId> <artifactId>javacc</artifactId> </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -82,12 +97,24 @@ <version>${zookeeper.version}</version> <type>test-jar</type> <scope>test</scope> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - <version>${netty.version}</version> - <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.commons</groupId> @@ -184,6 +211,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>${netty.version}</version> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java index 00ac0d0..3969f41 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java @@ -22,7 +22,6 @@ import com.google.common.base.Optional; import java.net.UnknownHostException; import java.util.*; -import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; @@ -32,14 +31,14 @@ import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.commons.configuration.Configuration; import com.google.common.collect.Lists; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.HashedWheelTimer; + /** * Special ensemble placement policy that always return local bookie. Only works with ledgers with ensemble=1. */ http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index a42db17..fd25361 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -20,12 +20,18 @@ */ package org.apache.bookkeeper.client; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; + import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -57,11 +63,9 @@ import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.lang.SystemUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +92,7 @@ public class BookKeeper implements AutoCloseable { static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class); final ZooKeeper zk; - final ClientSocketChannelFactory channelFactory; + final EventLoopGroup eventLoopGroup; // The stats logger for this client. private final StatsLogger statsLogger; @@ -101,9 +105,9 @@ public class BookKeeper implements AutoCloseable { private OpStatsLogger readLacOpLogger; - // whether the socket factory is one we created, or is owned by whoever + // whether the event loop group is one we created, or is owned by whoever // instantiated us - boolean ownChannelFactory = false; + boolean ownEventLoopGroup = false; // whether the zk handle is one we created, or is owned by whoever // instantiated us boolean ownZKHandle = false; @@ -138,7 +142,7 @@ public class BookKeeper implements AutoCloseable { final ClientConfiguration conf; ZooKeeper zk = null; - ClientSocketChannelFactory channelFactory = null; + EventLoopGroup eventLoopGroup = null; StatsLogger statsLogger = NullStatsLogger.INSTANCE; DNSToSwitchMapping dnsResolver = null; HashedWheelTimer requestTimer = null; @@ -148,8 +152,8 @@ public class BookKeeper implements AutoCloseable { this.conf = conf; } - public Builder setChannelFactory(ClientSocketChannelFactory f) { - channelFactory = f; + public Builder setEventLoopGroup(EventLoopGroup f) { + eventLoopGroup = f; return this; } @@ -181,7 +185,7 @@ public class BookKeeper implements AutoCloseable { public BookKeeper build() throws IOException, InterruptedException, KeeperException { Preconditions.checkNotNull(statsLogger, "No stats logger provided"); - return new BookKeeper(conf, zk, channelFactory, statsLogger, dnsResolver, requestTimer, featureProvider); + return new BookKeeper(conf, zk, eventLoopGroup, statsLogger, dnsResolver, requestTimer, featureProvider); } } @@ -190,7 +194,7 @@ public class BookKeeper implements AutoCloseable { } /** - * Create a bookkeeper client. A zookeeper client and a client socket factory + * Create a bookkeeper client. A zookeeper client and a client event loop group * will be instantiated as part of this constructor. * * @param servers @@ -209,7 +213,7 @@ public class BookKeeper implements AutoCloseable { /** * Create a bookkeeper client using a configuration object. - * A zookeeper client and a client socket factory will be + * A zookeeper client and a client event loop group will be * instantiated as part of this constructor. * * @param conf @@ -229,10 +233,10 @@ public class BookKeeper implements AutoCloseable { return zk; } - private static ClientSocketChannelFactory validateChannelFactory(ClientSocketChannelFactory factory) + private static EventLoopGroup validateEventLoopGroup(EventLoopGroup eventLoopGroup) throws NullPointerException { - Preconditions.checkNotNull(factory, "No Channel Factory provided"); - return factory; + Preconditions.checkNotNull(eventLoopGroup, "No Event Loop Group provided"); + return eventLoopGroup; } /** @@ -257,7 +261,7 @@ public class BookKeeper implements AutoCloseable { /** * Create a bookkeeper client but use the passed in zookeeper client and - * client socket channel factory instead of instantiating those. + * client event loop group instead of instantiating those. * * @param conf * Client Configuration Object @@ -266,15 +270,15 @@ public class BookKeeper implements AutoCloseable { * Zookeeper client instance connected to the zookeeper with which * the bookies have registered. The ZooKeeper client must be connected * before it is passed to BookKeeper. Otherwise a KeeperException is thrown. - * @param channelFactory - * A factory that will be used to create connections to the bookies + * @param eventLoopGroup + * An event loop group that will be used to create connections to the bookies * @throws IOException * @throws InterruptedException * @throws KeeperException if the passed zk handle is not connected */ - public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory) + public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup) throws IOException, InterruptedException, KeeperException { - this(conf, validateZooKeeper(zk), validateChannelFactory(channelFactory), NullStatsLogger.INSTANCE, + this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), NullStatsLogger.INSTANCE, null, null, null); } @@ -283,7 +287,7 @@ public class BookKeeper implements AutoCloseable { */ private BookKeeper(ClientConfiguration conf, ZooKeeper zkc, - ClientSocketChannelFactory channelFactory, + EventLoopGroup eventLoopGroup, StatsLogger statsLogger, DNSToSwitchMapping dnsResolver, HashedWheelTimer requestTimer, @@ -310,18 +314,13 @@ public class BookKeeper implements AutoCloseable { this.ownZKHandle = false; } - // initialize channel factory - if (null == channelFactory) { - ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); - this.channelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(tfb.setNameFormat( - "BookKeeper-NIOBoss-%d").build()), - Executors.newCachedThreadPool(tfb.setNameFormat( - "BookKeeper-NIOWorker-%d").build())); - this.ownChannelFactory = true; + // initialize event loop group + if (null == eventLoopGroup) { + this.eventLoopGroup = getDefaultEventLoopGroup(); + this.ownEventLoopGroup = true; } else { - this.channelFactory = channelFactory; - this.ownChannelFactory = false; + this.eventLoopGroup = eventLoopGroup; + this.ownEventLoopGroup = false; } if (null == requestTimer) { @@ -365,7 +364,7 @@ public class BookKeeper implements AutoCloseable { .build(); // initialize bookie client - this.bookieClient = new BookieClient(conf, this.channelFactory, this.mainWorkerPool, statsLogger); + this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool, statsLogger); this.bookieWatcher = new BookieWatcher(conf, this.scheduler, this.placementPolicy, this); if (conf.getDiskWeightBasedPlacementEnabled()) { LOG.info("Weighted ledger placement enabled"); @@ -1178,8 +1177,8 @@ public class BookKeeper implements AutoCloseable { if (ownTimer) { requestTimer.stop(); } - if (ownChannelFactory) { - channelFactory.releaseExternalResources(); + if (ownEventLoopGroup) { + eventLoopGroup.shutdownGracefully(); } if (ownZKHandle) { zk.close(); @@ -1255,4 +1254,20 @@ public class BookKeeper implements AutoCloseable { OpStatsLogger getAddOpLogger() { return addOpLogger; } OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; } OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; } + + static EventLoopGroup getDefaultEventLoopGroup() { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("bookkeeper-io-%s").build(); + final int numThreads = Runtime.getRuntime().availableProcessors() * 2; + + if (SystemUtils.IS_OS_LINUX) { + try { + return new EpollEventLoopGroup(numThreads, threadFactory); + } catch (Throwable t) { + LOG.warn("Could not use Netty Epoll event loop for bookie server: {}", t.getMessage()); + return new NioEventLoopGroup(numThreads, threadFactory); + } + } else { + return new NioEventLoopGroup(numThreads, threadFactory); + } + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java index 9194bf9..25a0f61 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java @@ -19,7 +19,8 @@ package org.apache.bookkeeper.client; */ -import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; + import java.util.zip.CRC32; class CRC32DigestManager extends DigestManager { @@ -40,16 +41,13 @@ class CRC32DigestManager extends DigestManager { } @Override - byte[] getValueAndReset() { - byte[] value = new byte[8]; - ByteBuffer buf = ByteBuffer.wrap(value); - buf.putLong(crc.get().getValue()); + void populateValueAndReset(ByteBuf buf) { + buf.writeLong(crc.get().getValue()); crc.get().reset(); - return value; } @Override - void update(byte[] data, int offset, int length) { - crc.get().update(data, offset, length); + void update(ByteBuf data) { + crc.get().update(data.nioBuffer()); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java index 2b13a29..7c3d46f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java @@ -28,6 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.base.Optional; +import io.netty.util.HashedWheelTimer; + import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject; @@ -37,7 +39,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.collections.CollectionUtils; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java index c72f31a..396e6d9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java @@ -18,16 +18,18 @@ package org.apache.bookkeeper.client; * limitations under the License. */ -import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; + import java.security.GeneralSecurityException; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.util.DoubleByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferInputStream; -import org.jboss.netty.buffer.ChannelBuffers; /** * This class takes an entry, attaches a digest to it and packages it with relevant @@ -47,11 +49,12 @@ abstract class DigestManager { abstract int getMacCodeLength(); void update(byte[] data) { - update(data, 0, data.length); + update(Unpooled.wrappedBuffer(data, 0, data.length)); } - abstract void update(byte[] data, int offset, int length); - abstract byte[] getValueAndReset(); + abstract void update(ByteBuf buffer); + + abstract void populateValueAndReset(ByteBuf buffer); final int macCodeLength; @@ -81,26 +84,21 @@ abstract class DigestManager { * @return */ - public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data, int doffset, int dlength) { - - byte[] bufferArray = new byte[METADATA_LENGTH + macCodeLength]; - ByteBuffer buffer = ByteBuffer.wrap(bufferArray); - buffer.putLong(ledgerId); - buffer.putLong(entryId); - buffer.putLong(lastAddConfirmed); - buffer.putLong(length); - buffer.flip(); + public ByteBuf computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data, + int doffset, int dlength) { + ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength); + headersBuffer.writeLong(ledgerId); + headersBuffer.writeLong(entryId); + headersBuffer.writeLong(lastAddConfirmed); + headersBuffer.writeLong(length); - update(buffer.array(), 0, METADATA_LENGTH); - update(data, doffset, dlength); - byte[] digest = getValueAndReset(); + ByteBuf dataBuffer = Unpooled.wrappedBuffer(data, doffset, dlength); - buffer.limit(buffer.capacity()); - buffer.position(METADATA_LENGTH); - buffer.put(digest); - buffer.flip(); + update(headersBuffer); + update(dataBuffer); + populateValueAndReset(headersBuffer); - return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data, doffset, dlength)); + return DoubleByteBuf.get(headersBuffer, dataBuffer); } /** @@ -110,39 +108,28 @@ abstract class DigestManager { * @return */ - public ChannelBuffer computeDigestAndPackageForSendingLac(long lac) { + public ByteBuf computeDigestAndPackageForSendingLac(long lac) { + ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength); + headersBuffer.writeLong(ledgerId); + headersBuffer.writeLong(lac); - byte[] bufferArray = new byte[LAC_METADATA_LENGTH + macCodeLength]; - ByteBuffer buffer = ByteBuffer.wrap(bufferArray); - buffer.putLong(ledgerId); - buffer.putLong(lac); - buffer.flip(); + update(headersBuffer); + populateValueAndReset(headersBuffer); - update(buffer.array(), 0, LAC_METADATA_LENGTH); - byte[] digest = getValueAndReset(); - - buffer.limit(buffer.capacity()); - buffer.position(LAC_METADATA_LENGTH); - buffer.put(digest); - buffer.flip(); - - return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer)); + return headersBuffer; } - private void verifyDigest(ChannelBuffer dataReceived) throws BKDigestMatchException { + private void verifyDigest(ByteBuf dataReceived) throws BKDigestMatchException { verifyDigest(LedgerHandle.INVALID_ENTRY_ID, dataReceived, true); } - private void verifyDigest(long entryId, ChannelBuffer dataReceived) throws BKDigestMatchException { + private void verifyDigest(long entryId, ByteBuf dataReceived) throws BKDigestMatchException { verifyDigest(entryId, dataReceived, false); } - private void verifyDigest(long entryId, ChannelBuffer dataReceived, boolean skipEntryIdCheck) + private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryIdCheck) throws BKDigestMatchException { - ByteBuffer dataReceivedBuffer = dataReceived.toByteBuffer(); - byte[] digest; - if ((METADATA_LENGTH + macCodeLength) > dataReceived.readableBytes()) { logger.error("Data received is smaller than the minimum for this digest type. " + " Either the packet it corrupt, or the wrong digest is configured. " @@ -150,17 +137,21 @@ abstract class DigestManager { this.getClass().getName(), dataReceived.readableBytes()); throw new BKDigestMatchException(); } - update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), METADATA_LENGTH); + update(dataReceived.slice(0, METADATA_LENGTH)); int offset = METADATA_LENGTH + macCodeLength; - update(dataReceivedBuffer.array(), dataReceivedBuffer.position() + offset, dataReceived.readableBytes() - offset); - digest = getValueAndReset(); + update(dataReceived.slice(offset, dataReceived.readableBytes() - offset)); - for (int i = 0; i < digest.length; i++) { - if (digest[i] != dataReceived.getByte(METADATA_LENGTH + i)) { + ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength); + populateValueAndReset(digest); + + try { + if (digest.compareTo(dataReceived.slice(METADATA_LENGTH, macCodeLength)) != 0) { logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId); throw new BKDigestMatchException(); } + } finally { + digest.release(); } long actualLedgerId = dataReceived.readLong(); @@ -180,9 +171,7 @@ abstract class DigestManager { } - long verifyDigestAndReturnLac(ChannelBuffer dataReceived) throws BKDigestMatchException{ - ByteBuffer dataReceivedBuffer = dataReceived.toByteBuffer(); - byte[] digest; + long verifyDigestAndReturnLac(ByteBuf dataReceived) throws BKDigestMatchException{ if ((LAC_METADATA_LENGTH + macCodeLength) > dataReceived.readableBytes()) { logger.error("Data received is smaller than the minimum for this digest type." + " Either the packet it corrupt, or the wrong digest is configured. " @@ -190,14 +179,21 @@ abstract class DigestManager { this.getClass().getName(), dataReceived.readableBytes()); throw new BKDigestMatchException(); } - update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), LAC_METADATA_LENGTH); - digest = getValueAndReset(); - for (int i = 0; i < digest.length; i++) { - if (digest[i] != dataReceived.getByte(LAC_METADATA_LENGTH + i)) { + + update(dataReceived.slice(0, LAC_METADATA_LENGTH)); + + ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength); + try { + populateValueAndReset(digest); + + if (digest.compareTo(dataReceived.slice(LAC_METADATA_LENGTH, macCodeLength)) != 0) { logger.error("Mac mismatch for ledger-id LAC: " + ledgerId); throw new BKDigestMatchException(); } + } finally { + digest.release(); } + long actualLedgerId = dataReceived.readLong(); long lac = dataReceived.readLong(); if (actualLedgerId != ledgerId) { @@ -216,11 +212,11 @@ abstract class DigestManager { * @return * @throws BKDigestMatchException */ - ChannelBufferInputStream verifyDigestAndReturnData(long entryId, ChannelBuffer dataReceived) + ByteBufInputStream verifyDigestAndReturnData(long entryId, ByteBuf dataReceived) throws BKDigestMatchException { verifyDigest(entryId, dataReceived); dataReceived.readerIndex(METADATA_LENGTH + macCodeLength); - return new ChannelBufferInputStream(dataReceived); + return new ByteBufInputStream(dataReceived); } static class RecoveryData { @@ -234,7 +230,7 @@ abstract class DigestManager { } - RecoveryData verifyDigestAndReturnLastConfirmed(ChannelBuffer dataReceived) throws BKDigestMatchException { + RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) throws BKDigestMatchException { verifyDigest(dataReceived); dataReceived.readerIndex(8); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java index d2e16e8..af49fa1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java @@ -25,6 +25,8 @@ import java.util.Set; import com.google.common.base.Optional; +import io.netty.util.HashedWheelTimer; + import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -32,7 +34,6 @@ import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.stats.StatsLogger; -import org.jboss.netty.util.HashedWheelTimer; /** * Encapsulation of the algorithm that selects a number of bookies from the cluster as an ensemble for storing http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java index e452b24..d4a12e6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java @@ -26,10 +26,11 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.LedgerHandle.LastAddConfirmedCallback; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; + interface ExplicitLacFlushPolicy { void stopExplicitLacFlush(); @@ -129,8 +130,7 @@ interface ExplicitLacFlushPolicy { lh.bk.mainWorkerPool.submit(new SafeRunnable() { @Override public void safeRun() { - ChannelBuffer toSend = lh.macManager - .computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed()); + ByteBuf toSend = lh.macManager.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed()); op.initiate(toSend); } }); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java index 3f2580f..4266c90 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java @@ -19,6 +19,8 @@ */ package org.apache.bookkeeper.client; +import io.netty.buffer.ByteBuf; + import java.util.ArrayList; import java.util.HashSet; import java.util.Map; @@ -31,7 +33,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,7 @@ public class LedgerChecker { } public void readEntryComplete(int rc, long ledgerId, long entryId, - ChannelBuffer buffer, Object ctx) { + ByteBuf buffer, Object ctx) { if (rc == BKException.Code.OK) { if (numEntries.decrementAndGet() == 0 && !completed.getAndSet(true)) { @@ -126,7 +127,7 @@ public class LedgerChecker { } public void readEntryComplete(int rc, long ledgerId, long entryId, - ChannelBuffer buffer, Object ctx) { + ByteBuf buffer, Object ctx) { if (BKException.Code.NoSuchEntryException != rc && BKException.Code.NoSuchLedgerExistsException != rc) { entryMayExist.set(true); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java index 91f897c..6502e05 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java @@ -21,12 +21,13 @@ package org.apache.bookkeeper.client; * */ +import io.netty.buffer.ByteBufInputStream; + import java.io.IOException; import java.io.InputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.jboss.netty.buffer.ChannelBufferInputStream; /** * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and @@ -40,7 +41,7 @@ public class LedgerEntry { long ledgerId; long entryId; long length; - ChannelBufferInputStream entryDataStream; + ByteBufInputStream entryDataStream; LedgerEntry(long lId, long eId) { this.ledgerId = lId; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index 22241e6..0522d50 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -19,6 +19,8 @@ */ package org.apache.bookkeeper.client; +import io.netty.buffer.ByteBuf; + import java.util.ArrayList; import java.util.Enumeration; import java.util.HashSet; @@ -40,8 +42,6 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback; import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException.Code; -import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -281,7 +281,7 @@ public class LedgerFragmentReplicator { final long dataLength = data.length; numEntriesRead.inc(); numBytesRead.registerSuccessfulValue(dataLength); - ChannelBuffer toSend = lh.getDigestManager() + ByteBuf toSend = lh.getDigestManager() .computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), entry.getLength(), data, 0, data.length); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index d1e5540..4ae4f48 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.client; import static com.google.common.base.Charsets.UTF_8; +import io.netty.buffer.ByteBuf; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -51,7 +52,6 @@ import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +110,7 @@ public class LedgerHandle implements AutoCloseable { this.ledgerId = ledgerId; - if (bk.getConf().getThrottleValue() > 0) { + if (bk.getConf().getThrottleValue() > 0) { this.throttler = RateLimiter.create(bk.getConf().getThrottleValue()); } else { this.throttler = null; @@ -285,7 +285,7 @@ public class LedgerHandle implements AutoCloseable { asyncClose(new SyncCloseCallback(), counter); explicitLacFlushPolicy.stopExplicitLacFlush(); - + SynchCallbackUtils.waitForResult(counter); } @@ -811,12 +811,16 @@ public class LedgerHandle implements AutoCloseable { } try { - bk.mainWorkerPool.submit(new SafeRunnable() { + bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() { @Override public void safeRun() { - ChannelBuffer toSend = macManager.computeDigestAndPackageForSending( - entryId, lastAddConfirmed, currentLength, data, offset, length); - op.initiate(toSend, length); + ByteBuf toSend = macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed, + currentLength, data, offset, length); + try { + op.initiate(toSend, length); + } finally { + toSend.release(); + } } @Override public String toString() { @@ -1024,7 +1028,7 @@ public class LedgerHandle implements AutoCloseable { * returns the value of the last add confirmed from the metadata. * * @see #getLastAddConfirmed() - * + * * @param cb * callback to return read explicit last confirmed * @param ctx @@ -1049,7 +1053,7 @@ public class LedgerHandle implements AutoCloseable { @Override public void getLacComplete(int rc, long lac) { if (rc == BKException.Code.OK) { - // here we are trying to update lac only but not length + // here we are trying to update lac only but not length updateLastConfirmed(lac, 0); cb.readLastConfirmedComplete(rc, lac, ctx); } else { @@ -1133,6 +1137,7 @@ public class LedgerHandle implements AutoCloseable { while ((pendingAddOp = pendingAddOps.peek()) != null && blockAddCompletions.get() == 0) { if (!pendingAddOp.completed) { + LOG.debug("pending add not completed: {}", pendingAddOp); return; } // Check if it is the next entry in the sequence. @@ -1183,9 +1188,9 @@ public class LedgerHandle implements AutoCloseable { void handleBookieFailure(final BookieSocketAddress addr, final int bookieIndex) { // If this is the first failure, - // try to submit completed pendingAddOps before this failure. + // try to submit completed pendingAddOps before this failure. if (0 == blockAddCompletions.get()) { - sendAddSuccessCallbacks(); + sendAddSuccessCallbacks(); } blockAddCompletions.incrementAndGet(); @@ -1197,9 +1202,9 @@ public class LedgerHandle implements AutoCloseable { addr, bookieIndex); blockAddCompletions.decrementAndGet(); - // Try to submit completed pendingAddOps, pending by this fix. + // Try to submit completed pendingAddOps, pending by this fix. if (0 == blockAddCompletions.get()) { - sendAddSuccessCallbacks(); + sendAddSuccessCallbacks(); } return; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index 4cdcdca..ffc469e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -32,10 +32,11 @@ import java.util.concurrent.RejectedExecutionException; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; + /** * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add entries with * user supplied entryIds. Through this interface Ledger Length may not be accurate wile the @@ -213,9 +214,10 @@ public class LedgerHandleAdv extends LedgerHandle { bk.mainWorkerPool.submit(new SafeRunnable() { @Override public void safeRun() { - ChannelBuffer toSend = macManager.computeDigestAndPackageForSending( + ByteBuf toSend = macManager.computeDigestAndPackageForSending( op.getEntryId(), lastAddConfirmed, currentLength, data, offset, length); op.initiate(toSend, length); + toSend.release(); } }); } catch (RejectedExecutionException e) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java index df09d52..920143f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java @@ -18,6 +18,8 @@ package org.apache.bookkeeper.client; * limitations under the License. */ +import io.netty.buffer.ByteBuf; + import java.security.GeneralSecurityException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -73,13 +75,13 @@ class MacDigestManager extends DigestManager { @Override - byte[] getValueAndReset() { - return mac.get().doFinal(); + void populateValueAndReset(ByteBuf buffer) { + buffer.writeBytes(mac.get().doFinal()); } @Override - void update(byte[] data, int offset, int length) { - mac.get().update(data, offset, length); + void update(ByteBuf data) { + mac.get().update(data.nioBuffer()); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 1946069..9407189 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -17,6 +17,11 @@ */ package org.apache.bookkeeper.client; +import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -28,9 +33,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.RejectedExecutionException; @@ -48,7 +50,7 @@ import java.util.concurrent.TimeUnit; class PendingAddOp implements WriteCallback, TimerTask { private final static Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); - ChannelBuffer toSend; + ByteBuf toSend; AddCallback cb; Object ctx; long entryId; @@ -66,6 +68,7 @@ class PendingAddOp implements WriteCallback, TimerTask { Timeout timeout = null; OpStatsLogger addOpLogger; + boolean callbackTriggered = false; PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) { this.lh = lh; @@ -153,6 +156,10 @@ class PendingAddOp implements WriteCallback, TimerTask { return; } + if (callbackTriggered) { + return; + } + if (LOG.isDebugEnabled()) { LOG.debug("Unsetting success for ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: " + bookieIndex); @@ -167,12 +174,20 @@ class PendingAddOp implements WriteCallback, TimerTask { sendWriteRequest(bookieIndex); } - void initiate(ChannelBuffer toSend, int entryLength) { + void initiate(ByteBuf toSend, int entryLength) { + if (callbackTriggered) { + // this should only be true if the request was failed due to another request ahead in the pending queue, + // so we can just ignore this request + return; + } + if (timeoutSec > -1) { this.timeout = lh.bk.bookieClient.scheduleTimeout(this, timeoutSec, TimeUnit.SECONDS); } this.requestTimeNanos = MathUtils.nowInNano(); this.toSend = toSend; + // Retain the buffer until all writes are complete + this.toSend.retain(); this.entryLength = entryLength; for (int bookieIndex : writeSet) { sendWriteRequest(bookieIndex); @@ -233,6 +248,14 @@ class PendingAddOp implements WriteCallback, TimerTask { if (null != timeout) { timeout.cancel(); } + + + ReferenceCountUtil.release(toSend); + + if (LOG.isDebugEnabled()) { + LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", new Object[] { lh.getId(), entryId, rc }); + } + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); if (rc != BKException.Code.OK) { addOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); @@ -242,6 +265,7 @@ class PendingAddOp implements WriteCallback, TimerTask { addOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); } cb.addComplete(rc, lh, entryId, ctx); + callbackTriggered = true; } @Override http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java index 64e266f..5b48461 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java @@ -19,13 +19,11 @@ package org.apache.bookkeeper.client; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.DigestManager.RecoveryData; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.jboss.netty.buffer.ChannelBuffer; + +import io.netty.buffer.ByteBuf; /** * This represents a pending ReadLac operation. @@ -74,7 +72,8 @@ class PendingReadLacOp implements ReadLacCallback { } @Override - public void readLacComplete(int rc, long ledgerId, final ChannelBuffer lacBuffer, final ChannelBuffer lastEntryBuffer, Object ctx) { + public void readLacComplete(int rc, long ledgerId, final ByteBuf lacBuffer, final ByteBuf lastEntryBuffer, + Object ctx) { int bookieIndex = (Integer) ctx; numResponsesPending--; boolean heardValidResponse = false; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index cafe8f7..57a84f8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -20,6 +20,9 @@ */ package org.apache.bookkeeper.client; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; + import java.util.ArrayList; import java.util.BitSet; import java.util.Enumeration; @@ -41,8 +44,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -221,8 +222,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { // return true if we managed to complete the entry // return false if the read entry is not complete or it is already completed before - boolean complete(BookieSocketAddress host, final ChannelBuffer buffer) { - ChannelBufferInputStream is; + boolean complete(BookieSocketAddress host, final ByteBuf buffer) { + ByteBufInputStream is; try { is = lh.macManager.verifyDigestAndReturnData(entryId, buffer); } catch (BKDigestMatchException e) { @@ -352,7 +353,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { } @Override - public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) { + public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { final ReadContext rctx = (ReadContext)ctx; final LedgerEntryRequest entry = rctx.entry; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java index dc7368b..45c3898 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java @@ -25,10 +25,11 @@ import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback; import org.apache.bookkeeper.stats.OpStatsLogger; -import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; + /** * This represents a pending WriteLac operation. When it has got * success from Ack Quorum bookies, sends success back to the application, @@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory; */ class PendingWriteLacOp implements WriteLacCallback { private final static Logger LOG = LoggerFactory.getLogger(PendingWriteLacOp.class); - ChannelBuffer toSend; + ByteBuf toSend; AddLacCallback cb; long lac; Object ctx; @@ -74,7 +75,7 @@ class PendingWriteLacOp implements WriteLacCallback { lac, toSend, this, bookieIndex); } - void initiate(ChannelBuffer toSend) { + void initiate(ByteBuf toSend) { this.toSend = toSend; for (int bookieIndex: writeSet) { sendWriteLacRequest(bookieIndex); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java index 7272447..a72e2ca 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java @@ -26,7 +26,8 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.net.Node; import org.apache.bookkeeper.stats.StatsLogger; -import org.jboss.netty.util.HashedWheelTimer; + +import io.netty.util.HashedWheelTimer; public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicyImpl implements ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode> { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 8d56f7a..b5fdfed 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -48,7 +48,6 @@ import org.apache.bookkeeper.net.StabilizeNetworkTopology; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.ReflectionUtils; import org.apache.commons.collections.CollectionUtils; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +55,8 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import io.netty.util.HashedWheelTimer; + /** * Simple rackware ensemble placement policy. * http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java index af21f44..e95a527 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java @@ -17,13 +17,15 @@ */ package org.apache.bookkeeper.client; +import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; + import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.DigestManager.RecoveryData; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.proto.BookieProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.jboss.netty.buffer.ChannelBuffer; /** * This class encapsulated the read last confirmed operation. @@ -75,7 +77,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback { } public synchronized void readEntryComplete(final int rc, final long ledgerId, final long entryId, - final ChannelBuffer buffer, final Object ctx) { + final ByteBuf buffer, final Object ctx) { int bookieIndex = (Integer) ctx; numResponsesPending--; @@ -96,6 +98,8 @@ class ReadLastConfirmedOp implements ReadEntryCallback { } } + ReferenceCountUtil.release(buffer); + if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) { // this still counts as a valid response, e.g., if the client crashed without writing any entry heardValidResponse = true; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index ed9985f..787a5e3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap; import com.google.common.base.Optional; +import io.netty.util.HashedWheelTimer; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.Feature; @@ -42,7 +43,6 @@ import org.apache.bookkeeper.net.NodeBase; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.commons.lang3.tuple.Pair; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java index 01b81c9..c896e93 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java @@ -21,10 +21,11 @@ import org.apache.bookkeeper.client.DigestManager.RecoveryData; import org.apache.bookkeeper.client.ReadLastConfirmedOp.LastConfirmedDataCallback; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; + import java.util.List; /** @@ -60,7 +61,7 @@ class TryReadLastConfirmedOp implements ReadEntryCallback { } @Override - public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) { + public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) { if (LOG.isTraceEnabled()) { LOG.trace("TryReadLastConfirmed received response for (lid={}, eid={}) : {}", new Object[] { ledgerId, entryId, rc }); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 6b913d4..ad025f2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -51,6 +51,7 @@ public class ClientConfiguration extends AbstractConfiguration { // NIO Parameters protected final static String CLIENT_TCP_NODELAY = "clientTcpNoDelay"; + protected final static String CLIENT_SOCK_KEEPALIVE = "clientSockKeepalive"; protected final static String CLIENT_SENDBUFFER_SIZE = "clientSendBufferSize"; protected final static String CLIENT_RECEIVEBUFFER_SIZE = "clientReceiveBufferSize"; protected final static String CLIENT_WRITEBUFFER_LOW_WATER_MARK = "clientWriteBufferLowWaterMark"; @@ -267,6 +268,29 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * get socket keepalive + * + * @return socket keepalive setting + */ + public boolean getClientSockKeepalive() { + return getBoolean(CLIENT_SOCK_KEEPALIVE, true); + } + + /** + * Set socket keepalive setting. + * + * This setting is used to send keep-alive messages on connection-oriented sockets. + * + * @param keepalive + * KeepAlive setting + * @return client configuration + */ + public ClientConfiguration setClientSockKeepalive(boolean keepalive) { + setProperty(CLIENT_SOCK_KEEPALIVE, Boolean.toString(keepalive)); + return this; + } + + /** * Get client netty channel send buffer size. * * @return client netty channel send buffer size http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 2f8bb9a..ab8bbaa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.Beta; -import com.google.common.collect.Lists; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.bookkeeper.util.BookKeeperConstants; @@ -90,6 +89,8 @@ public class ServerConfiguration extends AbstractConfiguration { protected final static String ALLOW_STORAGE_EXPANSION = "allowStorageExpansion"; // NIO Parameters protected final static String SERVER_TCP_NODELAY = "serverTcpNoDelay"; + protected final static String SERVER_SOCK_KEEPALIVE = "serverSockKeepalive"; + protected final static String SERVER_SOCK_LINGER = "serverTcpLinger"; // Zookeeper Parameters protected final static String ZK_TIMEOUT = "zkTimeout"; @@ -133,6 +134,11 @@ public class ServerConfiguration extends AbstractConfiguration { protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass"; + // Rx adaptive ByteBuf allocator parameters + protected final static String BYTEBUF_ALLOCATOR_SIZE_INITIAL = "byteBufAllocatorSizeInitial"; + protected final static String BYTEBUF_ALLOCATOR_SIZE_MIN = "byteBufAllocatorSizeMin"; + protected final static String BYTEBUF_ALLOCATOR_SIZE_MAX = "byteBufAllocatorSizeMax"; + // Bookie auth provider factory class name protected final static String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = "bookieAuthProviderFactoryClass"; @@ -741,6 +747,54 @@ public class ServerConfiguration extends AbstractConfiguration { } /** + * Timeout to drain the socket on close. + * + * @return socket linger setting + */ + public int getServerSockLinger() { + return getInt(SERVER_SOCK_LINGER, 0); + } + + /** + * Set socket linger timeout on close. + * + * When enabled, a close or shutdown will not return until all queued messages for the socket have been successfully + * sent or the linger timeout has been reached. Otherwise, the call returns immediately and the closing is done in + * the background. + * + * @param noDelay + * NoDelay setting + * @return server configuration + */ + public ServerConfiguration setServerSockLinger(int linger) { + setProperty(SERVER_SOCK_LINGER, Integer.toString(linger)); + return this; + } + + /** + * get socket keepalive + * + * @return socket keepalive setting + */ + public boolean getServerSockKeepalive() { + return getBoolean(SERVER_SOCK_KEEPALIVE, true); + } + + /** + * Set socket keepalive setting. + * + * This setting is used to send keep-alive messages on connection-oriented sockets. + * + * @param keepalive + * KeepAlive setting + * @return server configuration + */ + public ServerConfiguration setServerSockKeepalive(boolean keepalive) { + setProperty(SERVER_SOCK_KEEPALIVE, Boolean.toString(keepalive)); + return this; + } + + /** * Get zookeeper servers to connect * * @return zookeeper servers @@ -1749,6 +1803,63 @@ public class ServerConfiguration extends AbstractConfiguration { } } + /** + * Get Recv ByteBuf allocator initial buf size + * + * @return initial byteBuf size + */ + public int getRecvByteBufAllocatorSizeInitial() { + return getInt(BYTEBUF_ALLOCATOR_SIZE_INITIAL, 64 * 1024); + } + + /** + * Set Recv ByteBuf allocator initial buf size + * + * @param size + * buffer size + */ + public void setRecvByteBufAllocatorSizeInitial(int size) { + setProperty(BYTEBUF_ALLOCATOR_SIZE_INITIAL, size); + } + + /** + * Get Recv ByteBuf allocator min buf size + * + * @return min byteBuf size + */ + public int getRecvByteBufAllocatorSizeMin() { + return getInt(BYTEBUF_ALLOCATOR_SIZE_MIN, 64 * 1024); + } + + /** + * Set Recv ByteBuf allocator min buf size + * + * @param size + * buffer size + */ + public void setRecvByteBufAllocatorSizeMin(int size) { + setProperty(BYTEBUF_ALLOCATOR_SIZE_MIN, size); + } + + /** + * Get Recv ByteBuf allocator max buf size + * + * @return max byteBuf size + */ + public int getRecvByteBufAllocatorSizeMax() { + return getInt(BYTEBUF_ALLOCATOR_SIZE_MAX, 1 * 1024 * 1024); + } + + /** + * Set Recv ByteBuf allocator max buf size + * + * @param size + * buffer size + */ + public void setRecvByteBufAllocatorSizeMax(int size) { + setProperty(BYTEBUF_ALLOCATOR_SIZE_MAX, size); + } + /* * Set the bookie authentication provider factory class name. * If this is not set, no authentication will be used http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java index 8947abf..382c221 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java @@ -23,8 +23,9 @@ package org.apache.bookkeeper.net; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import io.netty.channel.local.LocalAddress; + import static org.apache.bookkeeper.util.BookKeeperConstants.COLON; -import org.jboss.netty.channel.local.LocalAddress; /** * This is a data wrapper class that is an InetSocketAddress, it would use the hostname http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java index 5dce906..e438634 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java @@ -18,12 +18,13 @@ package org.apache.bookkeeper.net; import org.apache.bookkeeper.util.MathUtils; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java index d785d29..658753c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java @@ -20,7 +20,7 @@ */ package org.apache.bookkeeper.processor; -import org.jboss.netty.channel.Channel; +import io.netty.channel.Channel; public interface RequestProcessor {
