http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index d7a0ba6..1bfe352 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -28,6 +28,7 @@ import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.exceptions.StreamUnavailableException; import com.twitter.distributedlog.service.config.NullStreamConfigProvider; import com.twitter.distributedlog.service.config.ServerConfiguration; +import com.twitter.distributedlog.service.placement.EqualLoadAppraiser; import com.twitter.distributedlog.service.stream.WriteOp; import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus; import com.twitter.distributedlog.service.stream.StreamImpl; @@ -140,16 +141,17 @@ public class TestDistributedLogService extends TestDistributedLogBase { converter = new IdentityStreamPartitionConverter(); } return new DistributedLogServiceImpl( - serverConf, - dlConf, - ConfUtils.getConstDynConf(dlConf), - new NullStreamConfigProvider(), - uri, - converter, - new LocalRoutingService(), - NullStatsLogger.INSTANCE, - NullStatsLogger.INSTANCE, - latch); + serverConf, + dlConf, + ConfUtils.getConstDynConf(dlConf), + new NullStreamConfigProvider(), + uri, + converter, + new LocalRoutingService(), + NullStatsLogger.INSTANCE, + NullStatsLogger.INSTANCE, + latch, + new EqualLoadAppraiser()); } private StreamImpl createUnstartedStream(DistributedLogServiceImpl service, @@ -777,21 +779,21 @@ public class TestDistributedLogService extends TestDistributedLogBase { .addHost("stream-0", service.getServiceAddress().getSocketAddress()) .setAllowRetrySameHost(false); - // routing service doesn't know 'stream-1' + service.startPlacementPolicy(); + WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext())); - assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode()); + assertEquals(StatusCode.FOUND, response.getHeader().getCode()); + assertEquals(service.getServiceAddress().toString(), + response.getHeader().getLocation()); - // service cache "stream-2" but not acquire + // service cache "stream-2" StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false); - response = FutureUtils.result(service.getOwner("stream-2", new WriteContext())); - assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode()); - // create write ops to stream-2 to make service acquire the stream WriteOp op = createWriteOp(service, "stream-2", 0L); stream.submit(op); stream.start(); WriteResponse wr = Await.result(op.result()); - assertEquals("Op should succeed", + assertEquals("Op should succeed", StatusCode.SUCCESS, wr.getHeader().getCode()); assertEquals("Service should acquire stream", StreamStatus.INITIALIZED, stream.getStatus()); @@ -804,18 +806,6 @@ public class TestDistributedLogService extends TestDistributedLogBase { assertEquals(StatusCode.FOUND, response.getHeader().getCode()); assertEquals(service.getServiceAddress().toString(), response.getHeader().getLocation()); - - // find the stream from the routing service - response = FutureUtils.result(service.getOwner("stream-0", new WriteContext())); - assertEquals(StatusCode.FOUND, response.getHeader().getCode()); - assertEquals(service.getServiceAddress().toString(), - response.getHeader().getLocation()); - - // add the tried host - WriteContext ctx = new WriteContext(); - ctx.addToTriedHosts(DLSocketAddress.toString(service.getServiceAddress().getSocketAddress())); - response = FutureUtils.result(service.getOwner("stream-0", ctx)); - assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode()); } }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java new file mode 100644 index 0000000..ab4eeae --- /dev/null +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.LinkedHashSet; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.runtime.BoxedUnit; + +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.twitter.distributedlog.client.routing.RoutingService; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.ScheduledThreadPoolTimer; +import com.twitter.util.Time; +import com.twitter.util.Timer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestLeastLoadPlacementPolicy { + + @Test + public void testCalculateBalances() throws Exception { + int numSevers = new Random().nextInt(20) + 1; + int numStreams = new Random().nextInt(200) + 1; + RoutingService mockRoutingService = mock(RoutingService.class); + DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( + new EqualLoadAppraiser(), mockRoutingService, mockNamespace, null, Duration.fromSeconds(600), new NullStatsLogger()); + TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams))); + long lowLoadPerServer = numStreams / numSevers; + long highLoadPerServer = lowLoadPerServer + 1; + for (ServerLoad serverLoad: serverLoads) { + long load = serverLoad.getLoad(); + assertEquals(load, serverLoad.getStreamLoads().size()); + assertTrue(String.format("Load %d is not between %d and %d", load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer); + } + } + + @Test + public void testRefreshAndPlaceStream() throws Exception { + int numSevers = new Random().nextInt(20) + 1; + int numStreams = new Random().nextInt(200) + 1; + RoutingService mockRoutingService = mock(RoutingService.class); + when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers)); + DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + try { + when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator()); + } catch (IOException e) { + fail(); + } + PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class); + LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( + new EqualLoadAppraiser(), mockRoutingService, mockNamespace, mockPlacementStateManager, Duration.fromSeconds(600), new NullStatsLogger()); + leastLoadPlacementPolicy.refresh(); + + final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class); + verify(mockPlacementStateManager).saveOwnership(captor.capture()); + TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>)captor.getValue(); + ServerLoad next = serverLoads.first(); + String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1")); + assertEquals(next.getServer(), serverPlacement); + } + + @Test + public void testCalculateUnequalWeight() throws Exception { + int numSevers = new Random().nextInt(20) + 1; + int numStreams = new Random().nextInt(200) + 1; + /* use AtomicInteger to have a final object in answer method */ + final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE); + RoutingService mockRoutingService = mock(RoutingService.class); + DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class); + when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() { + @Override + public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable { + int load = new Random().nextInt(100000); + if (load > maxLoad.get()) { + maxLoad.set(load); + } + return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load)); + } + }); + LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( + mockLoadAppraiser, mockRoutingService, mockNamespace, null, Duration.fromSeconds(600), new NullStatsLogger()); + TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams))); + long highestLoadSeen = Long.MIN_VALUE; + long lowestLoadSeen = Long.MAX_VALUE; + for (ServerLoad serverLoad: serverLoads) { + long load = serverLoad.getLoad(); + if (load < lowestLoadSeen) { + lowestLoadSeen = load; + } + if (load > highestLoadSeen) { + highestLoadSeen = load; + } + } + assertTrue(highestLoadSeen - lowestLoadSeen < maxLoad.get()); + } + + private Set<SocketAddress> generateSocketAddresses(int num) { + LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>(); + for (int i = 0; i < num; i++) { + socketAddresses.add(new InetSocketAddress(i)); + } + return socketAddresses; + } + + private Set<String> generateStreams(int num) { + LinkedHashSet<String> streams = new LinkedHashSet<String>(); + for (int i = 0; i < num; i++) { + streams.add("stream_" + i); + } + return streams; + } + + private Set<String> generateServers(int num) { + LinkedHashSet<String> servers = new LinkedHashSet<String>(); + for (int i = 0; i < num; i++) { + servers.add("server_" + i); + } + return servers; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java new file mode 100644 index 0000000..bbd7e72 --- /dev/null +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.io.IOException; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestServerLoad { + + @Test + public void testSerializeDeserialize() throws IOException { + final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3"); + for (int i = 0; i < 20; i++) { + serverLoad.addStream(new StreamLoad("stream-"+i, i)); + } + assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize())); + } + + @Test + public void testGetLoad() throws IOException { + final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3"); + assertEquals(0, serverLoad.getLoad()); + serverLoad.addStream(new StreamLoad("stream-"+1, 3)); + assertEquals(3, serverLoad.getLoad()); + serverLoad.addStream(new StreamLoad("stream-"+2, 7)); + assertEquals(10, serverLoad.getLoad()); + serverLoad.addStream(new StreamLoad("stream-"+3, 1)); + assertEquals(11, serverLoad.getLoad()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java new file mode 100644 index 0000000..3a3e5c0 --- /dev/null +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.io.IOException; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestStreamLoad { + + @Test + public void testSerializeDeserialize() throws IOException { + final String streamName = "aHellaRandomStreamName"; + final int load = 1337; + final StreamLoad streamLoad = new StreamLoad(streamName, load); + assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java new file mode 100644 index 0000000..b104952 --- /dev/null +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.io.IOException; +import java.net.URI; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.curator.test.TestingServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.distributedlog.DistributedLogConfiguration; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE; + +public class TestZKPlacementStateManager { + private TestingServer zkTestServer; + private String zkServers; + private URI uri; + private ZKPlacementStateManager zkPlacementStateManager; + + @Before + public void startZookeeper() throws Exception { + zkTestServer = new TestingServer(2181); + zkServers = "127.0.0.1:2181"; + uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"); + zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE); + } + + @Test + public void testSaveLoad() throws Exception { + TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); + zkPlacementStateManager.saveOwnership(ownerships); + SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership(); + assertEquals(ownerships, loadedOwnerships); + + ownerships.add(new ServerLoad("emptyServer")); + zkPlacementStateManager.saveOwnership(ownerships); + loadedOwnerships = zkPlacementStateManager.loadOwnership(); + assertEquals(ownerships, loadedOwnerships); + + ServerLoad sl1 = new ServerLoad("server1"); + sl1.addStream(new StreamLoad("stream1", 3)); + sl1.addStream(new StreamLoad("stream2", 4)); + ServerLoad sl2 = new ServerLoad("server2"); + sl2.addStream(new StreamLoad("stream3", 1)); + ownerships.add(sl1); + ownerships.add(sl2); + zkPlacementStateManager.saveOwnership(ownerships); + loadedOwnerships = zkPlacementStateManager.loadOwnership(); + assertEquals(ownerships, loadedOwnerships); + + loadedOwnerships.remove(sl1); + zkPlacementStateManager.saveOwnership(ownerships); + loadedOwnerships = zkPlacementStateManager.loadOwnership(); + assertEquals(ownerships, loadedOwnerships); + } + + @Test + public void testWatchIndefinitely() throws Exception { + TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); + ownerships.add(new ServerLoad("server1")); + PlacementStateManager.PlacementCallback callback = mock(PlacementStateManager.PlacementCallback.class); + zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching + zkPlacementStateManager.watch(callback); + // cannot verify the callback here as it may call before the verify is called + + zkPlacementStateManager.saveOwnership(ownerships); + verify(callback, timeout(1000)).callback(ownerships); + + ServerLoad server2 = new ServerLoad("server2"); + server2.addStream(new StreamLoad("hella-important-stream", 415)); + ownerships.add(server2); + zkPlacementStateManager.saveOwnership(ownerships); + verify(callback, timeout(1000)).callback(ownerships); + + server2.removeStream("server1"); + zkPlacementStateManager.saveOwnership(ownerships); + verify(callback, timeout(1000)).callback(ownerships); + } + + @Test + public void testZkFormatting() throws Exception { + final String server = "smf1-eci-41-sr1.prod.twitter.com/10.70.186.139:31351"; + final String zkFormattedServer = "smf1-eci-41-sr1.prod.twitter.com--10.70.186.139:31351"; + URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"); + ZKPlacementStateManager zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE); + assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server)); + assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer)); + } + + @After + public void stopZookeeper() throws IOException { + zkTestServer.stop(); + } +}