Repository: zookeeper Updated Branches: refs/heads/master 1507f67a0 -> caca06276
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java index 15f993c..7c51a12 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java @@ -23,6 +23,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.TestByteBufAllocator; import org.apache.zookeeper.server.quorum.BufferStats; import org.apache.zookeeper.test.ClientBase; import org.junit.Assert; @@ -31,9 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertThat; /** @@ -48,9 +51,17 @@ public class NettyServerCnxnTest extends ClientBase { public void setUp() throws Exception { System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory"); + NettyServerCnxnFactory.setTestAllocator(TestByteBufAllocator.getInstance()); super.setUp(); } + @Override + public void tearDown() throws Exception { + super.tearDown(); + NettyServerCnxnFactory.clearTestAllocator(); + TestByteBufAllocator.checkForLeaks(); + } + /** * Test verifies the channel closure - while closing the channel * servercnxnfactory should remove all channel references to avoid @@ -110,6 +121,66 @@ public class NettyServerCnxnTest extends ClientBase { assertThat("Last client response size should be greater than 0 after client request was performed", clientResponseStats.getLastBufferSize(), greaterThan(0)); + + byte[] contents = zk.getData("/a", null, null); + assertArrayEquals("unexpected data", "test".getBytes(), contents); + } + } + + @Test + public void testServerSideThrottling() throws IOException, InterruptedException, KeeperException { + try (ZooKeeper zk = createClient()) { + BufferStats clientResponseStats = serverFactory.getZooKeeperServer().serverStats().getClientResponseStats(); + assertThat("Last client response size should be initialized with INIT_VALUE", + clientResponseStats.getLastBufferSize(), equalTo(BufferStats.INIT_VALUE)); + + zk.create("/a", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + assertThat("Last client response size should be greater than 0 after client request was performed", + clientResponseStats.getLastBufferSize(), greaterThan(0)); + + for (final ServerCnxn cnxn : serverFactory.cnxns) { + final NettyServerCnxn nettyCnxn = ((NettyServerCnxn) cnxn); + // Disable receiving data for all open connections ... + nettyCnxn.disableRecv(); + // ... then force a throttled read after 1 second (this puts the read into queuedBuffer) ... + nettyCnxn.getChannel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + nettyCnxn.getChannel().read(); + } + }, 1, TimeUnit.SECONDS); + + // ... and finally disable throttling after 2 seconds. + nettyCnxn.getChannel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + nettyCnxn.enableRecv(); + } + }, 2, TimeUnit.SECONDS); + } + + byte[] contents = zk.getData("/a", null, null); + assertArrayEquals("unexpected data", "test".getBytes(), contents); + + // As above, but don't do the throttled read. Make the request bytes wait in the socket + // input buffer until after throttling is turned off. Need to make sure both modes work. + for (final ServerCnxn cnxn : serverFactory.cnxns) { + final NettyServerCnxn nettyCnxn = ((NettyServerCnxn) cnxn); + // Disable receiving data for all open connections ... + nettyCnxn.disableRecv(); + // ... then disable throttling after 2 seconds. + nettyCnxn.getChannel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + nettyCnxn.enableRecv(); + } + }, 2, TimeUnit.SECONDS); + } + + contents = zk.getData("/a", null, null); + assertArrayEquals("unexpected data", "test".getBytes(), contents); } } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java index 6373bb3..c337e3c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java @@ -855,6 +855,7 @@ public class ClientTest extends ClientBase { // Sending a nonexisting opcode should cause the server to disconnect Assert.assertTrue("failed to disconnect", clientDisconnected.await(5000, TimeUnit.MILLISECONDS)); + zk.close(); } @Test http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java index 684d67a..bbcf869 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java @@ -22,7 +22,9 @@ import org.apache.zookeeper.ClientCnxnSocketNetty; import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.server.NettyServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -46,4 +48,15 @@ public class NettyNettySuiteBase { System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY); System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET); } + + @Before + public void setUpTest() throws Exception { + TestByteBufAllocatorTestHelper.setTestAllocator(TestByteBufAllocator.getInstance()); + } + + @After + public void tearDownTest() throws Exception { + TestByteBufAllocatorTestHelper.clearTestAllocator(); + TestByteBufAllocator.checkForLeaks(); + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java index 5725c17..836eaa0 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java @@ -20,7 +20,9 @@ package org.apache.zookeeper.test; import org.apache.zookeeper.server.NettyServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -41,4 +43,15 @@ public class NioNettySuiteBase { public static void tearDown() { System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY); } + + @Before + public void setUpTest() throws Exception { + TestByteBufAllocatorTestHelper.setTestAllocator(TestByteBufAllocator.getInstance()); + } + + @After + public void tearDownTest() throws Exception { + TestByteBufAllocatorTestHelper.clearTestAllocator(); + TestByteBufAllocator.checkForLeaks(); + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java index 8d10dc9..7b39ab1 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java @@ -60,6 +60,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ .getLogger(ReconfigTest.class); private QuorumUtil qu; + private ZooKeeper[] zkArr; + private ZooKeeperAdmin[] zkAdminArr; @Before public void setup() { @@ -70,6 +72,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ @After public void tearDown() throws Exception { + closeAllHandles(zkArr, zkAdminArr); if (qu != null) { qu.tearDown(); } @@ -237,12 +240,16 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ } public static void closeAllHandles(ZooKeeper[] zkArr, ZooKeeperAdmin[] zkAdminArr) throws InterruptedException { - for (ZooKeeper zk : zkArr) - if (zk != null) - zk.close(); - for (ZooKeeperAdmin zkAdmin : zkAdminArr) - if (zkAdmin != null) - zkAdmin.close(); + if (zkArr != null) { + for (ZooKeeper zk : zkArr) + if (zk != null) + zk.close(); + } + if (zkAdminArr != null) { + for (ZooKeeperAdmin zkAdmin : zkAdminArr) + if (zkAdmin != null) + zkAdmin.close(); + } } @Test @@ -250,8 +257,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(1); // create 3 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); List<String> leavingServers = new ArrayList<String>(); List<String> joiningServers = new ArrayList<String>(); @@ -317,8 +324,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ leavingServers.clear(); joiningServers.clear(); } - - closeAllHandles(zkArr, zkAdminArr); } /** @@ -332,8 +337,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(2); // create 5 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); List<String> leavingServers = new ArrayList<String>(); List<String> joiningServers = new ArrayList<String>(); @@ -423,8 +428,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ Assert.assertTrue(qu.getPeer(leavingIndex2).peer.getPeerState() == ServerState.OBSERVING); testNormalOperation(zkArr[stayingIndex2], zkArr[leavingIndex2]); testServerHasConfig(zkArr[leavingIndex2], joiningServers, null); - - closeAllHandles(zkArr, zkAdminArr); } @Test @@ -432,8 +435,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(3); // create 7 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); // new config will have three of the servers as followers // two of the servers as observers, and all ports different @@ -462,8 +465,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu.shutdown(4); testNormalOperation(zkArr[1], zkArr[2]); - - closeAllHandles(zkArr, zkAdminArr); } @Test @@ -471,8 +472,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(2); qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); List<String> leavingServers = new ArrayList<String>(); @@ -493,8 +494,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ testNormalOperation(zkArr[1], zkArr[2]); for (int i=1; i<=5; i++) testServerHasConfig(zkArr[i], null, leavingServers); - - closeAllHandles(zkArr, zkAdminArr); } @SuppressWarnings("unchecked") @@ -512,8 +511,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(1); // create 3 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); // changing a server's role / port is done by "adding" it with the same // id but different role / port @@ -581,7 +580,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ changingIndex = leaderIndex; } } - closeAllHandles(zkArr, zkAdminArr); } @Test @@ -589,8 +587,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(1); // create 3 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); List<String> joiningServers = new ArrayList<String>(); @@ -705,8 +703,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ testNormalOperation(zkArr[follower2], zkArr[follower1]); testServerHasConfig(zkArr[follower1], joiningServers, null); testServerHasConfig(zkArr[follower2], joiningServers, null); - - closeAllHandles(zkArr, zkAdminArr); } @Test @@ -722,8 +718,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(1); // create 3 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); List<String> joiningServers = new ArrayList<String>(); @@ -796,7 +792,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ testServerHasConfig(zkArr[serverIndex], joiningServers, null); Assert.assertEquals(oldClientPort, qu.getPeer(serverIndex).peer.getClientPort()); } - closeAllHandles(zkArr, zkAdminArr); } @Test @@ -818,8 +813,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(3); // create 7 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); ArrayList<String> members = new ArrayList<String>(); members.add("group.1=3:4:5"); @@ -886,8 +881,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ + i + " doesn't think the quorum system is a majority quorum system!"); } - - closeAllHandles(zkArr, zkAdminArr); } @Test @@ -895,7 +888,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(1); // create 3 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); + zkArr = createHandles(qu); testNormalOperation(zkArr[1], zkArr[2]); for (int i=1; i<4; i++) { String configStr = testServerHasConfig(zkArr[i], null, null); @@ -914,8 +907,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(1); // create 3 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); List<String> leavingServers = new ArrayList<String>(); List<String> joiningServers = new ArrayList<String>(); @@ -980,8 +973,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ // assert remotePeerBean.1 of ReplicatedServer_3 leavingQS3 = peer3.getView().get(new Long(leavingIndex)); assertRemotePeerMXBeanAttributes(leavingQS3, remotePeerBean3); - - closeAllHandles(zkArr, zkAdminArr); } /** @@ -993,8 +984,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ qu = new QuorumUtil(1); // create 3 servers qu.disableJMXTest = true; qu.startAll(); - ZooKeeper[] zkArr = createHandles(qu); - ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu); + zkArr = createHandles(qu); + zkAdminArr = createAdminHandles(qu); // changing a server's role / port is done by "adding" it with the same // id but different role / port @@ -1055,8 +1046,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ // assert remotePeerBean.1 of ReplicatedServer_3 changingQS3 = peer3.getView().get(new Long(changingIndex)); assertRemotePeerMXBeanAttributes(changingQS3, remotePeerBean3); - - closeAllHandles(zkArr, zkAdminArr); } private void assertLocalPeerMXBeanAttributes(QuorumPeer qp, http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java new file mode 100644 index 0000000..dc13222 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + +/** + * This is a custom ByteBufAllocator that tracks outstanding allocations and + * crashes the program if any of them are leaked. + * + * Never use this class in production, it will cause your server to run out + * of memory! This is because it holds strong references to all allocated + * buffers and doesn't release them until checkForLeaks() is called at the + * end of a unit test. + * + * Note: the original code was copied from https://github.com/airlift/drift, + * with the permission and encouragement of airlift's author (dain). Airlift + * uses the same apache 2.0 license as Zookeeper so this should be ok. + * + * However, the code was modified to take advantage of Netty's built-in + * leak tracking and make a best effort to print details about buffer leaks. + * + */ +public class TestByteBufAllocator extends PooledByteBufAllocator { + private static AtomicReference<TestByteBufAllocator> INSTANCE = + new AtomicReference<>(null); + + /** + * Get the singleton testing allocator. + * @return the singleton allocator, creating it if one does not exist. + */ + public static TestByteBufAllocator getInstance() { + TestByteBufAllocator result = INSTANCE.get(); + if (result == null) { + ResourceLeakDetector.Level oldLevel = ResourceLeakDetector.getLevel(); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + INSTANCE.compareAndSet(null, new TestByteBufAllocator(oldLevel)); + result = INSTANCE.get(); + } + return result; + } + + /** + * Destroys the singleton testing allocator and throws an error if any of the + * buffers allocated by it have been leaked. Attempts to print leak details to + * standard error before throwing, by using netty's built-in leak tracking. + * Note that this might not always work, since it only triggers when a buffer + * is garbage-collected and calling System.gc() does not guarantee that a buffer + * will actually be GC'ed. + * + * This should be called at the end of a unit test's tearDown() method. + */ + public static void checkForLeaks() { + TestByteBufAllocator result = INSTANCE.getAndSet(null); + if (result != null) { + result.checkInstanceForLeaks(); + } + } + + private final List<ByteBuf> trackedBuffers = new ArrayList<>(); + private final ResourceLeakDetector.Level oldLevel; + + private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel) + { + super(false); + this.oldLevel = oldLevel; + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) + { + return track(super.newHeapBuffer(initialCapacity, maxCapacity)); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) + { + return track(super.newDirectBuffer(initialCapacity, maxCapacity)); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) + { + return track(super.compositeHeapBuffer(maxNumComponents)); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) + { + return track(super.compositeDirectBuffer(maxNumComponents)); + } + + private synchronized CompositeByteBuf track(CompositeByteBuf byteBuf) + { + trackedBuffers.add(Objects.requireNonNull(byteBuf)); + return byteBuf; + } + + private synchronized ByteBuf track(ByteBuf byteBuf) + { + trackedBuffers.add(Objects.requireNonNull(byteBuf)); + return byteBuf; + } + + private void checkInstanceForLeaks() + { + try { + long referencedBuffersCount = 0; + synchronized (this) { + referencedBuffersCount = trackedBuffers.stream() + .filter(byteBuf -> byteBuf.refCnt() > 0) + .count(); + // Make tracked buffers eligible for GC + trackedBuffers.clear(); + } + // Throw an error if there were any leaked buffers + if (referencedBuffersCount > 0) { + // Trigger a GC. This will hopefully (but not necessarily) print + // details about detected leaks to standard error before the error + // is thrown. + System.gc(); + throw new AssertionError("Found a netty ByteBuf leak!"); + } + } finally { + ResourceLeakDetector.setLevel(oldLevel); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java new file mode 100644 index 0000000..de5e751 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import io.netty.buffer.ByteBufAllocator; +import org.apache.zookeeper.ClientCnxnSocketNetty; +import org.apache.zookeeper.server.NettyServerCnxnFactory; + +/** + * Uses reflection to call package-private methods in Netty connection classes + * to set/clear the test ByteBufAllocator. + */ +public class TestByteBufAllocatorTestHelper { + public static void setTestAllocator(ByteBufAllocator allocator) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { + Method m1 = NettyServerCnxnFactory.class.getDeclaredMethod("setTestAllocator", ByteBufAllocator.class); + m1.setAccessible(true); + m1.invoke(null, allocator); + Method m2 = ClientCnxnSocketNetty.class.getDeclaredMethod("setTestAllocator", ByteBufAllocator.class); + m2.setAccessible(true); + m2.invoke(null, allocator); + } + + public static void clearTestAllocator() + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { + Method m1 = NettyServerCnxnFactory.class.getDeclaredMethod("clearTestAllocator"); + m1.setAccessible(true); + m1.invoke(null); + Method m2 = ClientCnxnSocketNetty.class.getDeclaredMethod("clearTestAllocator"); + m2.setAccessible(true); + m2.invoke(null); + } +} \ No newline at end of file
