Address a few flaky test cases
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f607a48f Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f607a48f Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f607a48f Branch: refs/heads/master Commit: f607a48ff10499e3bb764a44cab8619c355d3bf4 Parents: 63d6bde Author: Sijie Guo <sij...@twitter.com> Authored: Thu Dec 29 15:16:20 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 16:06:20 2016 -0800 ---------------------------------------------------------------------- .../service/stream/StreamImpl.java | 48 +++++++++---------- .../service/TestDistributedLogService.java | 22 +++++++-- .../placement/TestLeastLoadPlacementPolicy.java | 11 +++-- .../service/placement/TestServerLoad.java | 4 +- .../service/placement/TestStreamLoad.java | 2 +- .../placement/TestZKPlacementStateManager.java | 50 ++++++++++++-------- 6 files changed, 81 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java index 9f049c8..36904fd 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java @@ -82,7 +82,7 @@ public class StreamImpl implements Stream { * any error, the stream should be put in error state. If a stream is in error state, * it should be removed and not reused anymore. */ - public static enum StreamStatus { + public enum StreamStatus { UNINITIALIZED(-1), INITIALIZING(0), INITIALIZED(1), @@ -101,7 +101,7 @@ public class StreamImpl implements Stream { return code; } - static boolean isUnavailable(StreamStatus status) { + public static boolean isUnavailable(StreamStatus status) { return StreamStatus.ERROR == status || StreamStatus.CLOSING == status || StreamStatus.CLOSED == status; } } @@ -763,23 +763,7 @@ public class StreamImpl implements Stream { // we will fail the requests that are coming in between closing and closed only // after the async writer is closed. so we could clear up the lock before redirect // them. - close(abort); - unregisterGauge(); - if (uncache) { - final long probationTimeoutMs; - if (null != owner) { - probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3; - } else { - probationTimeoutMs = 0L; - } - closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() { - @Override - public BoxedUnit apply(Void result) { - streamManager.scheduleRemoval(StreamImpl.this, probationTimeoutMs); - return BoxedUnit.UNIT; - } - }); - } + close(abort, uncache); return closePromise; } @@ -799,11 +783,29 @@ public class StreamImpl implements Stream { } /** + * Post action executed after closing. + */ + private void postClose(boolean uncache) { + closeManagerAndErrorOutPendingRequests(); + unregisterGauge(); + if (uncache) { + if (null != owner) { + long probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3; + streamManager.scheduleRemoval(this, probationTimeoutMs); + } else { + streamManager.notifyRemoved(this); + logger.info("Removed cached stream {}.", getStreamName()); + } + } + FutureUtils.setValue(closePromise, null); + } + + /** * Shouldn't call close directly. The callers should call #requestClose instead * * @param shouldAbort shall we abort the stream instead of closing */ - private Future<Void> close(boolean shouldAbort) { + private Future<Void> close(boolean shouldAbort, final boolean uncache) { boolean abort; closeLock.writeLock().lock(); try { @@ -841,16 +843,14 @@ public class StreamImpl implements Stream { new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { - closeManagerAndErrorOutPendingRequests(); - FutureUtils.setValue(closePromise, null); + postClose(uncache); } @Override public void onFailure(Throwable cause) { if (cause instanceof TimeoutException) { writerCloseTimeoutCounter.inc(); } - closeManagerAndErrorOutPendingRequests(); - FutureUtils.setValue(closePromise, null); + postClose(uncache); } }, scheduler, name)); return closePromise; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index 1bfe352..f97399d 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -211,8 +211,9 @@ public class TestDistributedLogService extends TestDistributedLogBase { WriteResponse wr1 = Await.result(op1.result()); assertEquals("Op 1 should fail", StatusCode.FOUND, wr1.getHeader().getCode()); - assertEquals("Service 1 should be in ERROR state", - StreamStatus.ERROR, s1.getStatus()); + // the stream will be set to ERROR and then be closed. + assertTrue("Service 1 should be in unavailable state", + StreamStatus.isUnavailable(s1.getStatus())); assertNotNull(s1.getManager()); assertNull(s1.getWriter()); assertNotNull(s1.getLastException()); @@ -527,15 +528,26 @@ public class TestDistributedLogService extends TestDistributedLogBase { public void testStreamOpNoChecksum() throws Exception { DistributedLogServiceImpl localService = createConfiguredLocalService(); WriteContext ctx = new WriteContext(); - Future<WriteResponse> result = localService.release("test", ctx); + HeartbeatOptions option = new HeartbeatOptions(); + option.setSendHeartBeatToReader(true); + + // hearbeat to acquire the stream and then release the stream + Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx, option); WriteResponse resp = Await.result(result); assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); - result = localService.delete("test", ctx); + result = localService.release("test", ctx); + resp = Await.result(result); + assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); + + // heartbeat to acquire the stream and then delete the stream + result = localService.heartbeatWithOptions("test", ctx, option); resp = Await.result(result); assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); - result = localService.heartbeat("test", ctx); + result = localService.delete("test", ctx); resp = Await.result(result); assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); + + // shutdown the local service localService.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java index ab4eeae..bde33c6 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java @@ -55,7 +55,7 @@ import static org.mockito.Mockito.when; public class TestLeastLoadPlacementPolicy { - @Test + @Test(timeout = 10000) public void testCalculateBalances() throws Exception { int numSevers = new Random().nextInt(20) + 1; int numStreams = new Random().nextInt(200) + 1; @@ -73,7 +73,7 @@ public class TestLeastLoadPlacementPolicy { } } - @Test + @Test(timeout = 10000) public void testRefreshAndPlaceStream() throws Exception { int numSevers = new Random().nextInt(20) + 1; int numStreams = new Random().nextInt(200) + 1; @@ -98,7 +98,7 @@ public class TestLeastLoadPlacementPolicy { assertEquals(next.getServer(), serverPlacement); } - @Test + @Test(timeout = 10000) public void testCalculateUnequalWeight() throws Exception { int numSevers = new Random().nextInt(20) + 1; int numStreams = new Random().nextInt(200) + 1; @@ -131,7 +131,10 @@ public class TestLeastLoadPlacementPolicy { highestLoadSeen = load; } } - assertTrue(highestLoadSeen - lowestLoadSeen < maxLoad.get()); + assertTrue("Unexpected placement for " + numStreams + " streams to " + + numSevers + " servers : highest load = " + highestLoadSeen + + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(), + highestLoadSeen - lowestLoadSeen < maxLoad.get()); } private Set<SocketAddress> generateSocketAddresses(int num) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java index bbd7e72..d844f78 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java @@ -25,7 +25,7 @@ import static org.junit.Assert.assertEquals; public class TestServerLoad { - @Test + @Test(timeout = 60000) public void testSerializeDeserialize() throws IOException { final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3"); for (int i = 0; i < 20; i++) { @@ -34,7 +34,7 @@ public class TestServerLoad { assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize())); } - @Test + @Test(timeout = 60000) public void testGetLoad() throws IOException { final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3"); assertEquals(0, serverLoad.getLoad()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java index 3a3e5c0..e5091f5 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java @@ -25,7 +25,7 @@ import static org.junit.Assert.assertEquals; public class TestStreamLoad { - @Test + @Test(timeout = 10000) public void testSerializeDeserialize() throws IOException { final String streamName = "aHellaRandomStreamName"; final int load = 1337; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f607a48f/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java index b104952..c02492d 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.URI; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.curator.test.TestingServer; @@ -30,15 +31,9 @@ import org.junit.Test; import com.twitter.distributedlog.DistributedLogConfiguration; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class TestZKPlacementStateManager { private TestingServer zkTestServer; @@ -54,7 +49,7 @@ public class TestZKPlacementStateManager { zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE); } - @Test + @Test(timeout = 60000) public void testSaveLoad() throws Exception { TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); zkPlacementStateManager.saveOwnership(ownerships); @@ -83,33 +78,48 @@ public class TestZKPlacementStateManager { assertEquals(ownerships, loadedOwnerships); } - @Test + private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc( + LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue, + int expectedNumServerLoads) throws InterruptedException { + TreeSet<ServerLoad> notification = notificationQueue.take(); + assertNotNull(notification); + while (notification.size() < expectedNumServerLoads) { + notification = notificationQueue.take(); + } + assertEquals(expectedNumServerLoads, notification.size()); + return notification; + } + + @Test(timeout = 60000) public void testWatchIndefinitely() throws Exception { TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); ownerships.add(new ServerLoad("server1")); - PlacementStateManager.PlacementCallback callback = mock(PlacementStateManager.PlacementCallback.class); + final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications = + new LinkedBlockingQueue<TreeSet<ServerLoad>>(); + PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() { + @Override + public void callback(TreeSet<ServerLoad> serverLoads) { + serverLoadNotifications.add(serverLoads); + } + }; zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching zkPlacementStateManager.watch(callback); // cannot verify the callback here as it may call before the verify is called zkPlacementStateManager.saveOwnership(ownerships); - verify(callback, timeout(1000)).callback(ownerships); + assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1)); ServerLoad server2 = new ServerLoad("server2"); server2.addStream(new StreamLoad("hella-important-stream", 415)); ownerships.add(server2); zkPlacementStateManager.saveOwnership(ownerships); - verify(callback, timeout(1000)).callback(ownerships); - - server2.removeStream("server1"); - zkPlacementStateManager.saveOwnership(ownerships); - verify(callback, timeout(1000)).callback(ownerships); + assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2)); } - @Test + @Test(timeout = 60000) public void testZkFormatting() throws Exception { - final String server = "smf1-eci-41-sr1.prod.twitter.com/10.70.186.139:31351"; - final String zkFormattedServer = "smf1-eci-41-sr1.prod.twitter.com--10.70.186.139:31351"; + final String server = "host/10.0.0.0:31351"; + final String zkFormattedServer = "host--10.0.0.0:31351"; URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"); ZKPlacementStateManager zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE); assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));