Repository: bookkeeper Updated Branches: refs/heads/master 811ece53a -> 74f795136
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java index dc43b2c..25b0d6b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java @@ -17,15 +17,15 @@ */ package org.apache.bookkeeper.client; -import org.jboss.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; public class ClientUtil { - public static ChannelBuffer generatePacket(long ledgerId, long entryId, long lastAddConfirmed, + public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed, long length, byte[] data) { return generatePacket(ledgerId, entryId, lastAddConfirmed, length, data, 0, data.length); } - public static ChannelBuffer generatePacket(long ledgerId, long entryId, long lastAddConfirmed, + public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed, long length, byte[] data, int offset, int len) { CRC32DigestManager dm = new CRC32DigestManager(ledgerId); return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length, http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java index 521d1e3..d77d184 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java @@ -165,6 +165,7 @@ public class SlowBookieTest extends BookKeeperClusterTestCase { try { while (!finished.get()) { lh.addEntry(entry); + Thread.sleep(1); } } catch (Exception e) { LOG.error("Exception in add entry thread", e); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java index ed41cb2..6ff1535 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java @@ -24,7 +24,6 @@ package org.apache.bookkeeper.client; import static org.junit.Assert.assertTrue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -32,18 +31,19 @@ import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; -import org.apache.bookkeeper.proto.BookkeeperProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; +import org.apache.bookkeeper.proto.BookkeeperProtocol; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + /** * This unit test tests timeout of GetBookieInfo request; * @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { private final static Logger LOG = LoggerFactory.getLogger(TestGetBookieInfoTimeout.class); DigestType digestType; - public ClientSocketChannelFactory channelFactory; + public EventLoopGroup eventLoopGroup; public OrderedSafeExecutor executor; public TestGetBookieInfoTimeout() { @@ -62,8 +62,8 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { @Before public void setUp() throws Exception { super.setUp(); - channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors - .newCachedThreadPool()); + eventLoopGroup = new NioEventLoopGroup(); + executor = OrderedSafeExecutor.newBuilder() .name("BKClientOrderedSafeExecutor") .numThreads(2) @@ -72,7 +72,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { @After public void tearDown() throws Exception { - channelFactory.releaseExternalResources(); + eventLoopGroup.shutdownGracefully(); executor.shutdown(); } @@ -99,7 +99,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { // try to get bookie info from the sleeping bookie. It should fail with timeout error BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(), bookieToSleep.getPort()); - BookieClient bc = new BookieClient(cConf, channelFactory, executor); + BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor); long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 6739ea4..99f8f5b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Optional; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.util.HashedWheelTimer; import junit.framework.TestCase; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; @@ -40,7 +41,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.net.NetworkTopology; import org.apache.bookkeeper.util.StaticDNSResolver; -import org.jboss.netty.util.HashedWheelTimer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +54,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase { final List<Integer> writeSet = new ArrayList<Integer>(); ClientConfiguration conf = new ClientConfiguration(); BookieSocketAddress addr1, addr2, addr3, addr4; - HashedWheelTimer timer; + io.netty.util.HashedWheelTimer timer; @Override protected void setUp() throws Exception { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java index af6a44e..ef90401 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java @@ -39,7 +39,6 @@ import org.apache.bookkeeper.net.CommonConfigurationKeys; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.net.ScriptBasedMapping; import org.apache.bookkeeper.util.Shell; -import org.jboss.netty.util.HashedWheelTimer; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -50,6 +49,8 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.util.HashedWheelTimer; + /** * In this testsuite, ScriptBasedMapping is used as DNS_RESOLVER_CLASS for * mapping nodes to racks. Shell Script - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index 5c61ae3..09c6262 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -27,6 +27,9 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Optional; import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import io.netty.util.HashedWheelTimer; + import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.Feature; @@ -38,7 +41,6 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.net.NetworkTopology; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.StaticDNSResolver; -import org.jboss.netty.util.HashedWheelTimer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java index 5a1f7fc..41f35a7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java @@ -32,6 +32,9 @@ import org.apache.bookkeeper.test.BaseTestCase; import org.junit.Assert; import org.junit.Test; +import io.netty.channel.Channel; +import io.netty.channel.local.LocalChannel; + /** * Tests of the main BookKeeper client using networkless comunication */ @@ -68,12 +71,11 @@ public class NetworkLessBookieTest extends BaseTestCase { } for (BookieServer bk : bs) { - for (ChannelManager channel : bk.nettyServer.channels) { - if (! (channel instanceof VMLocalChannelManager)) { + for (Channel channel : bk.nettyServer.allChannels) { + if (!(channel instanceof LocalChannel)) { Assert.fail(); } } } } - } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java index 0a9bdc4..ed2a4b5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java @@ -29,27 +29,22 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.ExtensionRegistry; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.auth.TestAuth; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import org.apache.bookkeeper.proto.BookieProtocol.*; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ArrayBlockingQueue; import static org.junit.Assert.*; @@ -63,14 +58,14 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { ExtensionRegistry extRegistry = ExtensionRegistry.newInstance(); ClientAuthProvider.Factory authProvider; - ClientSocketChannelFactory channelFactory - = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient") .build(); public TestBackwardCompatCMS42() throws Exception { super(0); + + baseConf.setGcWaitTime(60000); authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory( new ClientConfiguration()); } @@ -179,44 +174,36 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { } CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception { - return new CompatClient42(executor, channelFactory, addr, authProvider, extRegistry); + return new CompatClient42(executor, eventLoopGroup, addr, authProvider, extRegistry); } // extending PerChannelBookieClient to get the pipeline factory class CompatClient42 extends PerChannelBookieClient { final ArrayBlockingQueue<Response> responses = new ArrayBlockingQueue<Response>(10); - final Channel channel; + Channel channel; final CountDownLatch connected = new CountDownLatch(1); - CompatClient42(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, + CompatClient42(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry) throws Exception { - super(executor, channelFactory, addr, authProviderFactory, extRegistry); - - ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - bootstrap.setPipelineFactory(this); - bootstrap.setOption("tcpNoDelay", false); - bootstrap.setOption("keepAlive", true); - ChannelFuture f = bootstrap.connect(addr.getSocketAddress()).await(); - channel = f.getChannel(); + super(executor, eventLoopGroup, addr, authProviderFactory, extRegistry); + + state = ConnectionState.CONNECTING; + ChannelFuture future = connect(); + future.await(); + channel = future.channel(); + connected.countDown(); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - if (!(e.getMessage() instanceof Response)) { - LOG.error("Unknown message {}, passing upstream", e.getMessage()); - ctx.sendUpstream(e); + public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception { + if (!(msg instanceof Response)) { + LOG.error("Unknown message {}, passing upstream", msg); + ctx.fireChannelRead(msg); return; } - responses.add((Response)e.getMessage()); - } - - @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent e) - throws Exception { - connected.countDown(); + responses.add((Response) msg); } Response takeResponse() throws Exception { @@ -229,7 +216,7 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { void sendRequest(Request request) throws Exception { connected.await(); - channel.write(request); + channel.writeAndFlush(request); } } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java index 2095b66..16bbfd0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java @@ -33,20 +33,20 @@ import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.SafeRunnable; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.protobuf.ExtensionRegistry; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -78,14 +78,12 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { */ @Test(timeout=60000) public void testConnectCloseRace() throws Exception { - ClientSocketChannelFactory channelFactory - = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); for (int i = 0; i < 1000; i++) { - PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr, + PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup, addr, authProvider, extRegistry); client.connectIfNeededAndDoOp(new GenericCallback<PerChannelBookieClient>() { @Override @@ -96,7 +94,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { }); client.close(); } - channelFactory.releaseExternalResources(); + eventLoopGroup.shutdownGracefully(); executor.shutdown(); } @@ -123,21 +121,19 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { // we just want to trigger it connecting. } }; - ClientSocketChannelFactory channelFactory - = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); for (int i = 0; i < 100; i++) { - PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr, + PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup, addr, authProvider, extRegistry); for (int j = i; j < 10; j++) { client.connectIfNeededAndDoOp(nullop); } client.close(); } - channelFactory.releaseExternalResources(); + eventLoopGroup.shutdownGracefully(); executor.shutdown(); } @@ -157,16 +153,13 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { } }; final int ITERATIONS = 100000; - ClientSocketChannelFactory channelFactory - = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); - final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, + final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup, addr, authProvider, extRegistry); final AtomicBoolean shouldFail = new AtomicBoolean(false); - final AtomicBoolean inconsistent = new AtomicBoolean(false); final AtomicBoolean running = new AtomicBoolean(true); final CountDownLatch disconnectRunning = new CountDownLatch(1); Thread connectThread = new Thread() { @@ -206,12 +199,12 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { if ((state == ConnectionState.CONNECTED && (channel == null - || !channel.isConnected())) + || !channel.isActive())) || (state != ConnectionState.CONNECTED && channel != null - && channel.isConnected())) { + && channel.isActive())) { LOG.error("State({}) and channel({}) inconsistent " + channel, - state, channel == null ? null : channel.isConnected()); + state, channel == null ? null : channel.isActive()); shouldFail.set(true); running.set(false); } @@ -228,7 +221,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { checkThread.join(); assertFalse("Failure in threads, check logs", shouldFail.get()); client.close(); - channelFactory.releaseExternalResources(); + eventLoopGroup.shutdownGracefully(); executor.shutdown(); } @@ -255,19 +248,17 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { bsConfs.add(conf); bs.add(startBookie(conf, delayBookie)); - ClientSocketChannelFactory channelFactory - = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); - final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, + final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup, addr, authProvider, extRegistry); final CountDownLatch completion = new CountDownLatch(1); final ReadEntryCallback cb = new ReadEntryCallback() { @Override public void readEntryComplete(int rc, long ledgerId, long entryId, - ChannelBuffer buffer, Object ctx) { + ByteBuf buffer, Object ctx) { completion.countDown(); } }; @@ -292,7 +283,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { Thread.sleep(1000); client.disconnect(); client.close(); - channelFactory.releaseExternalResources(); + eventLoopGroup.shutdownGracefully(); executor.shutdown(); assertTrue("Request should have completed", completion.await(5, TimeUnit.SECONDS)); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index efb8375..a71dcb5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -446,6 +446,7 @@ public abstract class BookKeeperClusterTestCase { throws Exception { ServerConfiguration conf = newServerConfiguration(); bsConfs.add(conf); + LOG.info("Starting new bookie on port: {}", conf.getBookiePort()); bs.add(startBookie(conf)); return conf.getBookiePort(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 0698780..3d9a540 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -21,12 +21,16 @@ package org.apache.bookkeeper.test; * */ +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BKException.Code; @@ -44,24 +48,18 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.IOUtils; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.Assert.*; public class BookieClientTest { - private final static Logger LOG = LoggerFactory.getLogger(BookieClientTest.class); BookieServer bs; File tmpDir; public int port = 13645; - public ClientSocketChannelFactory channelFactory; + + public EventLoopGroup eventLoopGroup; public OrderedSafeExecutor executor; ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); @@ -77,8 +75,7 @@ public class BookieClientTest { .setLedgerDirNames(new String[] { tmpDir.getPath() }); bs = new BookieServer(conf); bs.start(); - channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors - .newCachedThreadPool()); + eventLoopGroup = new NioEventLoopGroup(); executor = OrderedSafeExecutor.newBuilder() .name("BKClientOrderedSafeExecutor") .numThreads(2) @@ -89,7 +86,7 @@ public class BookieClientTest { public void tearDown() throws Exception { bs.shutdown(); recursiveDelete(tmpDir); - channelFactory.releaseExternalResources(); + eventLoopGroup.shutdownGracefully(); executor.shutdown(); } @@ -110,13 +107,13 @@ public class BookieClientTest { ReadEntryCallback recb = new ReadEntryCallback() { - public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer bb, Object ctx) { + public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf bb, Object ctx) { ResultStruct rs = (ResultStruct) ctx; synchronized (rs) { rs.rc = rc; if (BKException.Code.OK == rc && bb != null) { bb.readerIndex(24); - rs.entry = bb.toByteBuffer(); + rs.entry = bb.nioBuffer(); } rs.notifyAll(); } @@ -146,9 +143,8 @@ public class BookieClientTest { BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port); ResultStruct arc = new ResultStruct(); - BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor); - ChannelBuffer bb; - bb = createByteBuffer(1, 1, 1); + BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); + ByteBuf bb = createByteBuffer(1, 1, 1); bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE); synchronized (arc) { arc.wait(1000); @@ -234,22 +230,20 @@ public class BookieClientTest { } } - private ChannelBuffer createByteBuffer(int i, long lid, long eid) { - ByteBuffer bb; - bb = ByteBuffer.allocate(4 + 24); - bb.putLong(lid); - bb.putLong(eid); - bb.putLong(eid-1); - bb.putInt(i); - bb.flip(); - return ChannelBuffers.wrappedBuffer(bb); + private ByteBuf createByteBuffer(int i, long lid, long eid) { + ByteBuf bb = Unpooled.buffer(4 + 16); + bb.writeLong(lid); + bb.writeLong(eid); + bb.writeLong(eid - 1); + bb.writeInt(i); + return bb; } @Test(timeout=60000) public void testNoLedger() throws Exception { ResultStruct arc = new ResultStruct(); BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port); - BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor); + BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); synchronized (arc) { bc.readEntry(addr, 2, 13, recb, arc); arc.wait(1000); @@ -260,7 +254,7 @@ public class BookieClientTest { @Test(timeout=60000) public void testGetBookieInfo() throws IOException, InterruptedException { BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port); - BookieClient bc = new BookieClient(new ClientConfiguration(), channelFactory, executor); + BookieClient bc = new BookieClient(new ClientConfiguration(), new NioEventLoopGroup(), executor); long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java index 4d29ff4..5f1f839 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java @@ -260,7 +260,7 @@ public class BookieReadWriteTest extends MultiLedgerManagerMultiDigestTestCase } } - @Test + @Test(timeout=60000) public void testReadWriteAsyncSingleClient200() throws IOException { testReadWriteAsyncSingleClient(200); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java index 3a36129..771628b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java @@ -21,9 +21,12 @@ package org.apache.bookkeeper.test; * */ +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + import java.io.IOException; import java.util.Arrays; -import java.util.concurrent.Executors; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -31,9 +34,6 @@ import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,8 +67,8 @@ class LoopbackClient implements WriteCallback { } } - LoopbackClient(ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor, long begin, int limit) throws IOException { - this.client = new BookieClient(new ClientConfiguration(), channelFactory, executor); + LoopbackClient(EventLoopGroup eventLoopGroup, OrderedSafeExecutor executor, long begin, int limit) throws IOException { + this.client = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); this.begin = begin; } @@ -78,7 +78,7 @@ class LoopbackClient implements WriteCallback { byte[] passwd = new byte[20]; Arrays.fill(passwd, (byte) 'a'); - client.addEntry(addr, ledgerId, passwd, entry, ChannelBuffers.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE); + client.addEntry(addr, ledgerId, passwd, entry, Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE); } public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { @@ -94,15 +94,14 @@ class LoopbackClient implements WriteCallback { long begin = System.currentTimeMillis(); LoopbackClient lb; - ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors - .newCachedThreadPool()); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() .name("BookieClientScheduler") .numThreads(2) .build(); try { BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", Integer.valueOf(args[2]).intValue()); - lb = new LoopbackClient(channelFactory, executor, begin, limit.intValue()); + lb = new LoopbackClient(eventLoopGroup, executor, begin, limit.intValue()); for (int i = 0; i < limit; i++) { lb.write(ledgerId, i, data, addr, lb, c); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java new file mode 100644 index 0000000..b619d0e --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/DoubleByteBufTest.java @@ -0,0 +1,121 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.bookkeeper.util; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; + +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +public class DoubleByteBufTest { + + @Test(timeout = 30000) + public void testGetBytes() { + ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 }); + ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 4, 5, 6 }); + doTest(b1, b2); + } + + @Test(timeout = 30000) + public void testGetBytesWithDoubleByteBufAssource() { + ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 }); + ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 }); + ByteBuf b3 = Unpooled.wrappedBuffer(new byte[] { 5, 6 }); + + ByteBuf b23 = DoubleByteBuf.get(b2, b3); + doTest(b1, b23); + } + + @Test(timeout = 30000) + public void testGetBytesWithIndex() { + ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2, 3 }); + ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 9, 9, 4, 5, 6 }); + + // Skip the two '9' from b2 + b2.readByte(); + b2.readByte(); + + doTest(b1, b2); + } + + private void doTest(ByteBuf b1, ByteBuf b2) { + ByteBuf buf = DoubleByteBuf.get(b1, b2); + + assertEquals(6, buf.readableBytes()); + assertEquals(0, buf.writableBytes()); + + ByteBuf dst1 = Unpooled.buffer(6); + buf.getBytes(0, dst1); + assertEquals(6, dst1.readableBytes()); + assertEquals(0, dst1.writableBytes()); + assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6 }), dst1); + + ByteBuf dst2 = Unpooled.buffer(6); + buf.getBytes(0, dst2, 4); + assertEquals(4, dst2.readableBytes()); + assertEquals(2, dst2.writableBytes()); + assertEquals(Unpooled.wrappedBuffer(new byte[] { 1, 2, 3, 4 }), dst2); + + ByteBuf dst3 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 }); + buf.getBytes(0, dst3, 1, 4); + assertEquals(6, dst3.readableBytes()); + assertEquals(0, dst3.writableBytes()); + assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 0 }), dst3); + + ByteBuf dst4 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 }); + buf.getBytes(2, dst4, 1, 3); + assertEquals(6, dst4.readableBytes()); + assertEquals(0, dst4.writableBytes()); + assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 3, 4, 5, 0, 0 }), dst4); + + ByteBuf dst5 = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 }); + buf.getBytes(3, dst5, 1, 3); + assertEquals(6, dst5.readableBytes()); + assertEquals(0, dst5.writableBytes()); + assertEquals(Unpooled.wrappedBuffer(new byte[] { 0, 4, 5, 6, 0, 0 }), dst5); + } + + @Test(timeout = 30000) + public void testCopyToArray() { + ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 }); + ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 }); + ByteBuf b = DoubleByteBuf.get(b1, b2); + + byte[] a1 = new byte[4]; + b.getBytes(0, a1); + assertArrayEquals(new byte[] { 1, 2, 3, 4 }, a1); + + byte[] a2 = new byte[3]; + b.getBytes(1, a2); + assertArrayEquals(new byte[] { 2, 3, 4 }, a2); + } + + @Test(timeout = 30000) + public void testToByteBuffer() { + ByteBuf b1 = Unpooled.wrappedBuffer(new byte[] { 1, 2 }); + ByteBuf b2 = Unpooled.wrappedBuffer(new byte[] { 3, 4 }); + ByteBuf b = DoubleByteBuf.get(b1, b2); + + assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }), b.nioBuffer()); + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 73f62b4..a892cef 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <protobuf.version>2.6.1</protobuf.version> <guava.version>13.0.1</guava.version> - <netty.version>3.9.4.Final</netty.version> + <netty.version>4.1.10.Final</netty.version> <zookeeper.version>3.5.1-alpha</zookeeper.version> </properties> <url>http://zookeeper.apache.org/bookkeeper</url>
