http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/http/handlers/StatSupplierTestBase.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/http/handlers/StatSupplierTestBase.java b/commons/src/test/java/com/twitter/common/net/http/handlers/StatSupplierTestBase.java new file mode 100644 index 0000000..12926ef --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/http/handlers/StatSupplierTestBase.java @@ -0,0 +1,39 @@ +package com.twitter.common.net.http.handlers; + +import java.util.List; +import java.util.Map; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + +import org.junit.Before; + +import com.twitter.common.stats.Stat; +import com.twitter.common.testing.easymock.EasyMockTest; + +import static org.easymock.EasyMock.expect; + +/** + * @author William Farner + */ +public abstract class StatSupplierTestBase extends EasyMockTest { + + protected Supplier<Iterable<Stat<?>>> statSupplier; + + @Before + public void statSupplierSetUp() { + statSupplier = createMock(new Clazz<Supplier<Iterable<Stat<?>>>>() {}); + } + + protected void expectVarScrape(Map<String, Object> response) { + List<Stat<?>> vars = Lists.newArrayList(); + for (Map.Entry<String, Object> entry : response.entrySet()) { + Stat stat = createMock(Stat.class); + expect(stat.getName()).andReturn(entry.getKey()); + expect(stat.read()).andReturn(entry.getValue()); + vars.add(stat); + } + + expect(statSupplier.get()).andReturn(vars); + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/http/handlers/TimeSeriesDataSourceTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/http/handlers/TimeSeriesDataSourceTest.java b/commons/src/test/java/com/twitter/common/net/http/handlers/TimeSeriesDataSourceTest.java new file mode 100644 index 0000000..8958bd6 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/http/handlers/TimeSeriesDataSourceTest.java @@ -0,0 +1,159 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.http.handlers; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.collections.Iterables2; +import com.twitter.common.net.http.handlers.TimeSeriesDataSource.ResponseStruct; +import com.twitter.common.stats.TimeSeries; +import com.twitter.common.stats.TimeSeriesRepository; +import com.twitter.common.testing.easymock.EasyMockTest; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class TimeSeriesDataSourceTest extends EasyMockTest { + + private static final String TIME_COLUMN = TimeSeriesDataSource.TIME_METRIC; + private static final String TIME_SERIES_1 = "time_series_1"; + private static final String TIME_SERIES_2 = "time_series_2"; + + private static final List<Number> TIMESTAMPS = Arrays.<Number>asList(1d, 2d, 3d, 4d); + private static final Map<String, TimeSeries> TS_DATA = ImmutableMap.of( + TIME_SERIES_1, makeTimeSeries(TIME_SERIES_1, 1, 2, 3, 4), + TIME_SERIES_2, makeTimeSeries(TIME_SERIES_2, 0, 0, 0, 0) + ); + + private final Gson gson = new Gson(); + + private TimeSeriesDataSource dataSource; + private TimeSeriesRepository timeSeriesRepo; + + @Before + public void setUp() { + timeSeriesRepo = createMock(TimeSeriesRepository.class); + dataSource = new TimeSeriesDataSource(timeSeriesRepo); + } + + @Test + public void testGetColumns() throws Exception { + expect(timeSeriesRepo.getAvailableSeries()).andReturn(TS_DATA.keySet()); + + control.replay(); + + List<String> columns = gson.fromJson( + dataSource.getResponse(null, null), + new TypeToken<List<String>>() { }.getType()); + assertEquals(ImmutableList.copyOf(TS_DATA.keySet()), columns); + } + + @Test + @SuppressWarnings("unchecked") // Needed because type information lost in vargs. + public void testGetAllData() throws Exception { + expect(timeSeriesRepo.getTimestamps()).andReturn(TIMESTAMPS); + expect(timeSeriesRepo.get(TIME_SERIES_1)).andReturn(TS_DATA.get(TIME_SERIES_1)); + expect(timeSeriesRepo.get(TIME_SERIES_2)).andReturn(TS_DATA.get(TIME_SERIES_2)); + + control.replay(); + + String colString = Joiner.on(',').join( + Arrays.asList(TIME_SERIES_1, TIME_SERIES_2, TIME_COLUMN)); + + ResponseStruct response = gson.fromJson( + dataSource.getResponse(colString, null), + ResponseStruct.class); + + assertEquals(ImmutableList.of(TIME_COLUMN, TIME_SERIES_1, TIME_SERIES_2), response.names); + Iterable<List<Number>> expectedData = Iterables2.zip(0, + TIMESTAMPS, getSamples(TIME_SERIES_1), getSamples(TIME_SERIES_2)); + checkRows(expectedData, response.data); + } + + @Test + @SuppressWarnings("unchecked") // Needed because type information lost in vargs. + public void testFilterByTime() throws Exception { + expect(timeSeriesRepo.getTimestamps()).andReturn(TIMESTAMPS); + expect(timeSeriesRepo.get(TIME_SERIES_1)).andReturn(TS_DATA.get(TIME_SERIES_1)); + expect(timeSeriesRepo.get(TIME_SERIES_2)).andReturn(TS_DATA.get(TIME_SERIES_2)); + + control.replay(); + + String colString = Joiner.on(',').join( + Arrays.asList(TIME_SERIES_1, TIME_SERIES_2, TIME_COLUMN)); + + ResponseStruct response = gson.fromJson( + dataSource.getResponse(colString, "2"), + ResponseStruct.class); + + Iterable<List<Number>> expectedData = Iterables2.zip(0, + TIMESTAMPS, getSamples(TIME_SERIES_1), getSamples(TIME_SERIES_2)); + expectedData = Iterables.filter(expectedData, new Predicate<List<Number>>() { + @Override public boolean apply(List<Number> row) { + return row.get(0).intValue() >= 3; + } + }); + + checkRows(expectedData, response.data); + } + + private void checkRows(Iterable<List<Number>> expected, List<List<Number>> actual) { + assertEquals(Iterables.size(expected), actual.size()); + Iterator<List<Number>> actualIterator = actual.iterator(); + for (List<Number> expectedRow : expected) { + Iterator<Number> actualValueIterator = actualIterator.next().iterator(); + for (Number expectedValue : expectedRow) { + assertEquals("Expected row data " + expected + ", found " + actual, + expectedValue.doubleValue(), + actualValueIterator.next().doubleValue(), + 1e-9); + } + } + } + + private static Iterable<Number> getSamples(String tsName) { + return TS_DATA.get(tsName).getSamples(); + } + + private static TimeSeries makeTimeSeries(final String name, final Number... values) { + final List<Number> samples = Lists.newArrayList(); + for (Number value : values) samples.add(value.doubleValue()); + + return new TimeSeries() { + @Override public String getName() { return name; } + + @Override public Iterable<Number> getSamples() { + return samples; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/http/handlers/VarsHandlerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/http/handlers/VarsHandlerTest.java b/commons/src/test/java/com/twitter/common/net/http/handlers/VarsHandlerTest.java new file mode 100644 index 0000000..f8cedc2 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/http/handlers/VarsHandlerTest.java @@ -0,0 +1,65 @@ +package com.twitter.common.net.http.handlers; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.stats.Stat; + +import static org.junit.Assert.assertEquals; + +/** + * @author William Farner + */ +public class VarsHandlerTest extends StatSupplierTestBase { + + private VarsHandler vars; + private HttpServletRequest request; + + @Before + public void setUp() { + statSupplier = createMock(new Clazz<Supplier<Iterable<Stat<?>>>>() {}); + request = createMock(HttpServletRequest.class); + vars = new VarsHandler(statSupplier); + } + + @Test + public void testGetEmpty() { + expectVarScrape(ImmutableMap.<String, Object>of()); + + control.replay(); + + checkOutput(Collections.<String>emptyList()); + } + + @Test + public void testGet() { + expectVarScrape(ImmutableMap.<String, Object>of( + "float", 4.16126, + "int", 5, + "str", "foobar" + )); + + control.replay(); + + // expect the output to be in the same order + checkOutput(Arrays.asList( + "float 4.16126", + "int 5", + "str foobar")); + } + + private void checkOutput(List<String> expectedLines) { + assertEquals(expectedLines, + ImmutableList.copyOf(vars.getLines(request))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/http/handlers/VarsJsonHandlerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/http/handlers/VarsJsonHandlerTest.java b/commons/src/test/java/com/twitter/common/net/http/handlers/VarsJsonHandlerTest.java new file mode 100644 index 0000000..de9c2e5 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/http/handlers/VarsJsonHandlerTest.java @@ -0,0 +1,60 @@ +package com.twitter.common.net.http.handlers; + +import com.google.common.collect.ImmutableMap; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * @author William Farner + */ +public class VarsJsonHandlerTest extends StatSupplierTestBase { + + private VarsJsonHandler varsJson; + + @Before + public void setUp() { + varsJson = new VarsJsonHandler(statSupplier); + } + + @Test + public void testGetEmpty() { + expectVarScrape(ImmutableMap.<String, Object>of()); + + control.replay(); + + assertEquals("{}", varsJson.getBody(false)); + } + + @Test + public void testGet() { + expectVarScrape(ImmutableMap.<String, Object>of( + "str", "foobar", + "int", 5, + "float", 4.16126 + )); + + control.replay(); + + assertEquals("{\"str\":\"foobar\",\"int\":5,\"float\":4.16126}", varsJson.getBody(false)); + } + + @Test + public void testGetPretty() { + expectVarScrape(ImmutableMap.<String, Object>of( + "str", "foobar", + "int", 5, + "float", 4.16126 + )); + + control.replay(); + + assertEquals("{\n" + + " \"str\": \"foobar\",\n" + + " \"int\": 5,\n" + + " \"float\": 4.16126\n" + + "}", varsJson.getBody(true)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategyTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategyTest.java b/commons/src/test/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategyTest.java new file mode 100644 index 0000000..d4e9778 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/loadbalancing/LeastConnectedStrategyTest.java @@ -0,0 +1,289 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.loadbalancing; + +import java.util.Collection; + +import com.google.common.collect.ImmutableSet; + +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.base.Closure; +import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import com.twitter.common.net.pool.ResourceExhaustedException; +import com.twitter.common.testing.easymock.EasyMockTest; + +import static org.easymock.EasyMock.capture; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * @author William Farner + */ +public class LeastConnectedStrategyTest extends EasyMockTest { + + private static final String BACKEND_1 = "backend1"; + private static final String BACKEND_2 = "backend2"; + private static final String BACKEND_3 = "backend3"; + private static final String BACKEND_4 = "backend4"; + + private Closure<Collection<String>> onBackendsChosen; + + private LoadBalancingStrategy<String> leastCon; + + @Before + public void setUp() { + onBackendsChosen = createMock(new Clazz<Closure<Collection<String>>>() {}); + + leastCon = new LeastConnectedStrategy<String>(); + } + + @Test(expected = ResourceExhaustedException.class) + public void testNoBackends() throws ResourceExhaustedException { + control.replay(); + + leastCon.nextBackend(); + } + + @Test(expected = ResourceExhaustedException.class) + public void testEmptyBackends() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + backendOfferExpectation.offerBackends(); + + leastCon.nextBackend(); + } + + @Test + public void testPicksLeastConnected() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + backendOfferExpectation.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3); + + connect(BACKEND_1, 1); + connect(BACKEND_2, 2); + connect(BACKEND_3, 3); + assertThat(leastCon.nextBackend(), is(BACKEND_1)); + + connect(BACKEND_1, 2); + assertThat(leastCon.nextBackend(), is(BACKEND_2)); + } + + @Test + public void testPicksUnconnected() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + backendOfferExpectation.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3); + connect(BACKEND_1, 1); + connect(BACKEND_2, 2); + + assertThat(leastCon.nextBackend(), is(BACKEND_3)); + } + + @Test + @SuppressWarnings("unchecked") // Needed because type information lost in varargs. + public void testHandlesEqualCount() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + backendOfferExpectation.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3); + connect(BACKEND_1, 5); + connect(BACKEND_2, 5); + connect(BACKEND_3, 5); + + assertTrue(ImmutableSet.of(BACKEND_1, BACKEND_2, BACKEND_3).contains(leastCon.nextBackend())); + } + + @Test + public void testReranks() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + backendOfferExpectation.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3); + connect(BACKEND_1, 10); + connect(BACKEND_2, 5); + connect(BACKEND_3, 5); + + disconnect(BACKEND_1, 6); + + assertThat(leastCon.nextBackend(), is(BACKEND_1)); + } + + @Test + public void testUsesAllBackends_success() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + ImmutableSet<String> allBackends = ImmutableSet.of(BACKEND_1, BACKEND_2, BACKEND_3); + backendOfferExpectation.offerBackends(allBackends); + + ImmutableSet.Builder<String> usedBackends = ImmutableSet.builder(); + for (int i = 0; i < allBackends.size(); i++) { + String backend = leastCon.nextBackend(); + usedBackends.add(backend); + connect(backend, 1); + disconnect(backend, 1); + } + + assertThat(usedBackends.build(), is(allBackends)); + } + + @Test + public void UsesAllBackends_mixed() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + backendOfferExpectation.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3, BACKEND_4); + + connect(BACKEND_1, ConnectionResult.FAILED, 1); + assertThat(leastCon.nextBackend(), is(BACKEND_2)); + + connect(BACKEND_2, ConnectionResult.FAILED, 1); + assertThat(leastCon.nextBackend(), is(BACKEND_3)); + + connect(BACKEND_3, 1); + assertThat(leastCon.nextBackend(), is(BACKEND_4)); + + connect(BACKEND_4, 1); + + // Now we should rotate around to the front and give the connection failure another try. + assertThat(leastCon.nextBackend(), is(BACKEND_1)); + } + + @Test + public void testUsesAllBackends_failure() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + ImmutableSet<String> allBackends = ImmutableSet.of(BACKEND_1, BACKEND_2, BACKEND_3); + backendOfferExpectation.offerBackends(allBackends); + + ImmutableSet.Builder<String> usedBackends = ImmutableSet.builder(); + for (int i = 0; i < allBackends.size(); i++) { + String backend = leastCon.nextBackend(); + usedBackends.add(backend); + connect(backend, ConnectionResult.FAILED, 1); + } + + assertThat(usedBackends.build(), is(allBackends)); + } + + @Test + public void testUsedLeastExhausted() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + backendOfferExpectation.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3); + connect(BACKEND_1, 10); + disconnect(BACKEND_1, 10); + connect(BACKEND_3, 5); + disconnect(BACKEND_3, 5); + + assertThat(leastCon.nextBackend(), is(BACKEND_2)); + } + + @Test + public void testNoNegativeCounts() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = new BackendOfferExpectation(); + control.replay(); + + backendOfferExpectation.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3); + connect(BACKEND_1, 1); + connect(BACKEND_3, 1); + + // If there was a bug allowing connection count to go negative, BACKEND_1 would be chosen, + // but if it floors at zero, BACKEND_2 will be the lowest. + disconnect(BACKEND_1, 5); + } + + @Test + public void testForgetsOldBackends() throws ResourceExhaustedException { + BackendOfferExpectation offer1 = new BackendOfferExpectation(); + BackendOfferExpectation offer2 = new BackendOfferExpectation(); + BackendOfferExpectation offer3 = new BackendOfferExpectation(); + control.replay(); + + offer1.offerBackends(BACKEND_1, BACKEND_2); + connect(BACKEND_2, 10); + + offer2.offerBackends(BACKEND_2, BACKEND_3); + connect(BACKEND_3, 1); + assertThat(leastCon.nextBackend(), is(BACKEND_3)); + + offer3.offerBackends(BACKEND_2); + assertThat(leastCon.nextBackend(), is(BACKEND_2)); + } + + @Test + public void testAccountingSurvivesBackendChange() throws ResourceExhaustedException { + BackendOfferExpectation offer1 = new BackendOfferExpectation(); + BackendOfferExpectation offer2 = new BackendOfferExpectation(); + control.replay(); + + offer1.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3, BACKEND_4); + connect(BACKEND_1, 10); + connect(BACKEND_2, 8); + connect(BACKEND_3, 9); + assertThat(leastCon.nextBackend(), is(BACKEND_4)); + + offer2.offerBackends(BACKEND_1, BACKEND_2, BACKEND_3); + assertThat(leastCon.nextBackend(), is(BACKEND_2)); + } + + private void connect(String backend, int count) { + connect(backend, ConnectionResult.SUCCESS, count); + } + + private void connect(String backend, ConnectionResult result, int count) { + for (int i = 0; i < count; i++) { + leastCon.addConnectResult(backend, result, 0L); + } + } + + private void disconnect(String backend, int count) { + for (int i = 0; i < count; i++) { + leastCon.connectionReturned(backend); + } + } + + private class BackendOfferExpectation { + private final Capture<Collection<String>> chosenBackends; + + private BackendOfferExpectation() { + chosenBackends = createCapture(); + onBackendsChosen.execute(capture(chosenBackends)); + } + + void offerBackends(String... backends) { + offerBackends(ImmutableSet.copyOf(backends)); + } + + void offerBackends(ImmutableSet<String> backends) { + leastCon.offerBackends(backends, onBackendsChosen); + + assertTrue(chosenBackends.hasCaptured()); + assertEquals(backends, ImmutableSet.copyOf(chosenBackends.getValue())); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/loadbalancing/LoadBalancerImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/loadbalancing/LoadBalancerImplTest.java b/commons/src/test/java/com/twitter/common/net/loadbalancing/LoadBalancerImplTest.java new file mode 100644 index 0000000..751836f --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/loadbalancing/LoadBalancerImplTest.java @@ -0,0 +1,144 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.loadbalancing; + +import com.google.common.collect.Sets; +import com.twitter.common.base.Closure; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.net.pool.ResourceExhaustedException; +import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Set; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * @author William Farner + */ +public class LoadBalancerImplTest extends EasyMockTest { + + private static final String BACKEND_1 = "backend1"; + private static final String BACKEND_2 = "backend2"; + + private LoadBalancingStrategy<String> strategy; + private Closure<Collection<String>> onBackendsChosen; + + private LoadBalancer<String> loadBalancer; + + @Before + public void setUp() { + strategy = createMock(new Clazz<LoadBalancingStrategy<String>>() {}); + onBackendsChosen = createMock(new Clazz<Closure<Collection<String>>>() {}); + + loadBalancer = LoadBalancerImpl.create(this.strategy); + } + + @Test + public void testForwardsBasicCalls() throws ResourceExhaustedException { + BackendOfferExpectation backendOfferExpectation = + new BackendOfferExpectation(BACKEND_1, BACKEND_2); + expect(strategy.nextBackend()).andReturn(BACKEND_1); + strategy.addConnectResult(BACKEND_1, ConnectionResult.SUCCESS, 0L); + strategy.connectionReturned(BACKEND_1); + strategy.addConnectResult(BACKEND_1, ConnectionResult.TIMEOUT, 0L); + + control.replay(); + + backendOfferExpectation.simulateBackendsChosen(); + + assertThat(loadBalancer.nextBackend(), is(BACKEND_1)); + loadBalancer.connected(BACKEND_1, 0L); + loadBalancer.released(BACKEND_1); + loadBalancer.connectFailed(BACKEND_1, ConnectionResult.TIMEOUT); + } + + @Test + public void testHandlesUnknownBackend() { + BackendOfferExpectation first = new BackendOfferExpectation(BACKEND_1, BACKEND_2); + BackendOfferExpectation second = new BackendOfferExpectation(BACKEND_1); + + strategy.addConnectResult(BACKEND_1, ConnectionResult.SUCCESS, 0L); + strategy.connectionReturned(BACKEND_1); + + BackendOfferExpectation third = new BackendOfferExpectation(BACKEND_1, BACKEND_2); + + strategy.addConnectResult(BACKEND_1, ConnectionResult.SUCCESS, 0L); + strategy.addConnectResult(BACKEND_2, ConnectionResult.SUCCESS, 0L); + + BackendOfferExpectation fourth = new BackendOfferExpectation(BACKEND_1); + + strategy.addRequestResult(BACKEND_1, RequestResult.SUCCESS, 0L); + strategy.connectionReturned(BACKEND_1); + + control.replay(); + + first.simulateBackendsChosen(); + second.simulateBackendsChosen(); + + loadBalancer.connected(BACKEND_1, 0L); + loadBalancer.released(BACKEND_1); + + // Release an unrecognized connection, should not propagate to strategy. + loadBalancer.released("foo"); + + // Requests related to BACKEND_2 are not forwarded. + loadBalancer.connected(BACKEND_2, 0L); + loadBalancer.connectFailed(BACKEND_2, ConnectionResult.FAILED); + loadBalancer.requestResult(BACKEND_2, RequestResult.SUCCESS, 0L); + loadBalancer.released(BACKEND_2); + + third.simulateBackendsChosen(); + loadBalancer.connected(BACKEND_1, 0L); + loadBalancer.connected(BACKEND_2, 0L); + fourth.simulateBackendsChosen(); + loadBalancer.requestResult(BACKEND_1, RequestResult.SUCCESS, 0L); + loadBalancer.requestResult(BACKEND_2, RequestResult.SUCCESS, 0L); + loadBalancer.released(BACKEND_1); + loadBalancer.released(BACKEND_2); + } + + private class BackendOfferExpectation { + private final Set<String> backends; + private final Capture<Closure<Collection<String>>> onBackendsChosenCapture; + + private BackendOfferExpectation(String ... backends) { + this.backends = Sets.newHashSet(backends); + onBackendsChosenCapture = createCapture(); + + strategy.offerBackends(eq(this.backends), capture(onBackendsChosenCapture)); + onBackendsChosen.execute(this.backends); + } + + void simulateBackendsChosen() { + loadBalancer.offerBackends(backends, onBackendsChosen); + assertTrue(onBackendsChosenCapture.hasCaptured()); + + // Simulate the strategy notifying LoadBalancer's callback of a backend choice + onBackendsChosenCapture.getValue().execute(backends); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyTest.java b/commons/src/test/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyTest.java new file mode 100644 index 0000000..032ecd9 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyTest.java @@ -0,0 +1,335 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.loadbalancing; + +import com.google.common.base.Function; +import com.google.common.collect.Sets; +import com.google.common.base.Predicate; +import com.twitter.common.base.Closure; +import com.twitter.common.net.pool.ResourceExhaustedException; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult; +import com.twitter.common.util.BackoffDecider; +import com.twitter.common.util.Random; +import com.twitter.common.util.TruncatedBinaryBackoff; +import com.twitter.common.util.testing.FakeClock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; + +import static org.easymock.EasyMock.expect; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author William Farner + */ +public class MarkDeadStrategyTest extends EasyMockTest { + + private static final Amount<Long, Time> INITIAL_BACKOFF = Amount.of(1L, Time.SECONDS); + private static final Amount<Long, Time> MAX_BACKOFF = Amount.of(10L, Time.SECONDS); + + private static final String BACKEND_1 = "backend1"; + private static final String BACKEND_2 = "backend2"; + + private LoadBalancingStrategy<String> wrappedStrategy; + private Closure<Collection<String>> onBackendsChosen; + private Predicate<String> mockHostChecker; + + private LoadBalancingStrategy<String> markDead; + private Random random; + private FakeClock clock; + + @Before + public void setUp() { + wrappedStrategy = createMock(new Clazz<LoadBalancingStrategy<String>>() {}); + onBackendsChosen = createMock(new Clazz<Closure<Collection<String>>>() {}); + mockHostChecker = createMock(new Clazz<Predicate<String>>() {}); + + random = createMock(Random.class); + clock = new FakeClock(); + + Function<String, BackoffDecider> backoffFactory = + new Function<String, BackoffDecider>() { + @Override public BackoffDecider apply(String s) { + return BackoffDecider.builder(s) + .withSeedSize(1) + .withClock(clock) + .withRandom(random) + .withTolerateFailureRate(0.5) + .withStrategy(new TruncatedBinaryBackoff(INITIAL_BACKOFF, MAX_BACKOFF)) + // This recovery type is suggested for load balancer strategies to prevent + // connection pool churn that would occur from the random linear recovery type. + .withRecoveryType(BackoffDecider.RecoveryType.FULL_CAPACITY) + .withRequestWindow(MAX_BACKOFF) + .build(); + } + }; + + markDead = new MarkDeadStrategy<String>(wrappedStrategy, backoffFactory, mockHostChecker); + + } + + @After + public void verify() { + control.verify(); + } + + @Test(expected = ResourceExhaustedException.class) + public void testNoBackends() throws ResourceExhaustedException { + expect(wrappedStrategy.nextBackend()).andThrow(new ResourceExhaustedException("No backends.")); + + control.replay(); + + markDead.nextBackend(); + } + + @Test + public void testForwardsBasicCalls() throws ResourceExhaustedException { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expect(wrappedStrategy.nextBackend()).andReturn(BACKEND_1); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + assertThat(markDead.nextBackend(), is(BACKEND_1)); + } + + @Test + public void testAllHealthy() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectRequest(BACKEND_1, RequestResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 10); + expectRequest(BACKEND_2, RequestResult.SUCCESS, 10); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + request(BACKEND_1, RequestResult.SUCCESS, connect(BACKEND_1, ConnectionResult.SUCCESS, 10)); + request(BACKEND_2, RequestResult.SUCCESS, connect(BACKEND_2, ConnectionResult.SUCCESS, 10)); + } + + @Test + public void testOneFailingConnections() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 4); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 4); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1), onBackendsChosen); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + connect(BACKEND_1, ConnectionResult.SUCCESS, 10); + connect(BACKEND_2, ConnectionResult.SUCCESS, 4); + connect(BACKEND_2, ConnectionResult.FAILED, 10); + } + + @Test + public void testOneFailingRequests() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectRequest(BACKEND_1, RequestResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 10); + expectRequest(BACKEND_2, RequestResult.SUCCESS, 10); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectRequest(BACKEND_1, RequestResult.FAILED, 30); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_2), onBackendsChosen); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + request(BACKEND_1, RequestResult.SUCCESS, connect(BACKEND_1, ConnectionResult.SUCCESS, 10)); + request(BACKEND_2, RequestResult.SUCCESS, connect(BACKEND_2, ConnectionResult.SUCCESS, 10)); + connect(BACKEND_1, ConnectionResult.SUCCESS, 10); + request(BACKEND_1, RequestResult.FAILED, 50); + } + + @Test + public void testOneTimingOut() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectRequest(BACKEND_1, RequestResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 10); + expectRequest(BACKEND_2, RequestResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 10); + expectRequest(BACKEND_2, RequestResult.TIMEOUT, 30); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1), onBackendsChosen); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + request(BACKEND_1, RequestResult.SUCCESS, connect(BACKEND_1, ConnectionResult.SUCCESS, 10)); + request(BACKEND_2, RequestResult.SUCCESS, connect(BACKEND_2, ConnectionResult.SUCCESS, 10)); + connect(BACKEND_2, ConnectionResult.SUCCESS, 10); + request(BACKEND_2, RequestResult.TIMEOUT, 50); + } + + @Test + public void testFailingRecovers() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 4); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 4); + + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1), onBackendsChosen); + + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 9); + + expect(mockHostChecker.apply(BACKEND_2)).andReturn(true); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + connect(BACKEND_1, ConnectionResult.SUCCESS, 10); + connect(BACKEND_2, ConnectionResult.SUCCESS, 4); + connect(BACKEND_2, ConnectionResult.FAILED, 5); + + connect(BACKEND_1, ConnectionResult.SUCCESS, 5); + clock.advance(INITIAL_BACKOFF); // Wait for backoff period to expire. + clock.waitFor(1); + clock.advance(INITIAL_BACKOFF); // Wait for recovery period to expire. + connect(BACKEND_1, ConnectionResult.SUCCESS, 5); + connect(BACKEND_2, ConnectionResult.SUCCESS, 9); + } + + @Test + public void testFailingServerWithLiveHostChecker() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 4); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 4); + + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1), onBackendsChosen); + + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + + expect(mockHostChecker.apply(BACKEND_2)).andReturn(false); + + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 5); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 10); + + expect(mockHostChecker.apply(BACKEND_2)).andReturn(true); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + + connect(BACKEND_1, ConnectionResult.SUCCESS, 10); + connect(BACKEND_2, ConnectionResult.SUCCESS, 4); + connect(BACKEND_2, ConnectionResult.FAILED, 5); + + connect(BACKEND_1, ConnectionResult.SUCCESS, 5); + clock.advance(INITIAL_BACKOFF); // Wait for backoff period to expire. + clock.waitFor(1); + clock.advance(INITIAL_BACKOFF); // Wait for recovery period to expire. + connect(BACKEND_1, ConnectionResult.SUCCESS, 5); + clock.advance(INITIAL_BACKOFF); // Wait for backoff period to expire. + clock.waitFor(1); + clock.advance(INITIAL_BACKOFF); // Wait for recovery period to expire. + connect(BACKEND_1, ConnectionResult.SUCCESS, 5); + connect(BACKEND_2, ConnectionResult.SUCCESS, 10); + } + + @Test + public void testAllDead() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 10); + expectConnected(BACKEND_1, ConnectionResult.FAILED, 10); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 10); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 5); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + connect(BACKEND_1, ConnectionResult.SUCCESS, 10); + connect(BACKEND_2, ConnectionResult.SUCCESS, 10); + connect(BACKEND_1, ConnectionResult.FAILED, 15); + connect(BACKEND_2, ConnectionResult.FAILED, 15); + } + + @Test + public void testRecoversFromForcedLiveMode() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 5); + expectConnected(BACKEND_1, ConnectionResult.FAILED, 5); // Backend 1 starts backing off. + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_2), onBackendsChosen); + + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 5); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 5); // Backend 2 starts backing off. + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 5); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 5); // Backend 2 starts backing off. + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1), onBackendsChosen); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + + connect(BACKEND_1, ConnectionResult.SUCCESS, 5); + connect(BACKEND_1, ConnectionResult.FAILED, 6); // BACKEND_1 gets marked as dead. + + connect(BACKEND_2, ConnectionResult.SUCCESS, 5); + connect(BACKEND_2, ConnectionResult.FAILED, 6); // All now marked dead, forced into live mode. + + clock.advance(INITIAL_BACKOFF); // Wait for backoff period to expire. + clock.waitFor(1); + connect(BACKEND_2, ConnectionResult.SUCCESS, 5); + connect(BACKEND_2, ConnectionResult.FAILED, 5); // BACKEND_2 marked as dead. + } + + private int connect(String backend, ConnectionResult result, int count) { + for (int i = 0; i < count; i++) { + markDead.addConnectResult(backend, result, 0L); + } + return count; + } + + private void request(String backend, RequestResult result, int count) { + for (int i = 0; i < count; i++) { + markDead.addRequestResult(backend, result, 0L); + } + } + + private void expectConnected(String backend, ConnectionResult result, int count) { + for (int i = 0; i < count; i++) { + wrappedStrategy.addConnectResult(backend, result, 0L); + } + } + + private void expectRequest(String backend, RequestResult result, int count) { + for (int i = 0; i < count; i++) { + wrappedStrategy.addRequestResult(backend, result, 0L); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheckTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheckTest.java b/commons/src/test/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheckTest.java new file mode 100644 index 0000000..6014ce2 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/loadbalancing/MarkDeadStrategyWithHostCheckTest.java @@ -0,0 +1,149 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.loadbalancing; + +import java.util.Collection; + +import com.google.common.base.Function; +import com.google.common.collect.Sets; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.base.Closure; +import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.BackoffDecider; +import com.twitter.common.util.Random; +import com.twitter.common.util.TruncatedBinaryBackoff; +import com.twitter.common.util.testing.FakeClock; + +/** + * @author Krishna Gade + */ +public class MarkDeadStrategyWithHostCheckTest extends EasyMockTest { + + private static final Amount<Long, Time> INITIAL_BACKOFF = Amount.of(1L, Time.SECONDS); + private static final Amount<Long, Time> MAX_BACKOFF = Amount.of(10L, Time.SECONDS); + + private static final String BACKEND_1 = "backend1"; + private static final String BACKEND_2 = "backend2"; + + private LoadBalancingStrategy<String> wrappedStrategy; + private Closure<Collection<String>> onBackendsChosen; + + private LoadBalancingStrategy<String> markDead; + private Random random; + private FakeClock clock; + + @Before + public void setUp() { + wrappedStrategy = createMock(new Clazz<LoadBalancingStrategy<String>>() {}); + onBackendsChosen = createMock(new Clazz<Closure<Collection<String>>>() {}); + + random = createMock(Random.class); + clock = new FakeClock(); + + Function<String, BackoffDecider> backoffFactory = + new Function<String, BackoffDecider>() { + @Override public BackoffDecider apply(String s) { + return BackoffDecider.builder(s) + .withSeedSize(1) + .withClock(clock) + .withRandom(random) + .withTolerateFailureRate(0.5) + .withStrategy(new TruncatedBinaryBackoff(INITIAL_BACKOFF, MAX_BACKOFF)) + // This recovery type is suggested for load balancer strategies to prevent + // connection pool churn that would occur from the random linear recovery type. + .withRecoveryType(BackoffDecider.RecoveryType.FULL_CAPACITY) + .withRequestWindow(MAX_BACKOFF) + .build(); + } + }; + + markDead = new MarkDeadStrategyWithHostCheck<String>(wrappedStrategy, backoffFactory); + + } + + @After + public void verify() { + control.verify(); + } + + @Test + public void testDeadHost() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 4); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 4); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1), onBackendsChosen); + + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 1); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + connect(BACKEND_1, ConnectionResult.SUCCESS, 10); + connect(BACKEND_2, ConnectionResult.SUCCESS, 4); + connect(BACKEND_2, ConnectionResult.FAILED, 10); + clock.advance(INITIAL_BACKOFF); // Wait for backoff period to expire. + clock.waitFor(1); + clock.advance(INITIAL_BACKOFF); // Wait for recovery period to expire. + connect(BACKEND_2, ConnectionResult.SUCCESS, 1); + } + + @Test + public void testDeadHostWithMaxBackOff() { + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + expectConnected(BACKEND_1, ConnectionResult.SUCCESS, 10); + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 4); + expectConnected(BACKEND_2, ConnectionResult.FAILED, 4); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1), onBackendsChosen); + + expectConnected(BACKEND_2, ConnectionResult.SUCCESS, 1); + wrappedStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + + control.replay(); + + markDead.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2), onBackendsChosen); + connect(BACKEND_1, ConnectionResult.SUCCESS, 10); + connect(BACKEND_2, ConnectionResult.SUCCESS, 4); + connect(BACKEND_2, ConnectionResult.FAILED, 10); + clock.advance(INITIAL_BACKOFF); // Wait for backoff period to expire. + clock.waitFor(1); + clock.advance(INITIAL_BACKOFF); // Wait for recovery period to expire. + clock.advance(MAX_BACKOFF); // Wait for recovery period to expire. + connect(BACKEND_2, ConnectionResult.SUCCESS, 1); + } + + private int connect(String backend, ConnectionResult result, int count) { + for (int i = 0; i < count; i++) { + markDead.addConnectResult(backend, result, 0L); + } + return count; + } + + private void expectConnected(String backend, ConnectionResult result, int count) { + for (int i = 0; i < count; i++) { + wrappedStrategy.addConnectResult(backend, result, 0L); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/loadbalancing/RandomStrategyTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/loadbalancing/RandomStrategyTest.java b/commons/src/test/java/com/twitter/common/net/loadbalancing/RandomStrategyTest.java new file mode 100644 index 0000000..6385f70 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/loadbalancing/RandomStrategyTest.java @@ -0,0 +1,99 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.loadbalancing; + +import java.util.Collection; +import java.util.HashSet; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.base.Closure; +import com.twitter.common.net.pool.ResourceExhaustedException; +import com.twitter.common.testing.easymock.EasyMockTest; + +import static org.easymock.EasyMock.capture; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author William Farner + */ +public class RandomStrategyTest extends EasyMockTest { + + private static final String BACKEND_1 = "backend1"; + private static final String BACKEND_2 = "backend2"; + private static final String BACKEND_3 = "backend3"; + + private Closure<Collection<String>> onBackendsChosen; + + private LoadBalancingStrategy<String> randomStrategy; + + @Before + public void setUp() { + onBackendsChosen = createMock(new Clazz<Closure<Collection<String>>>() {}); + + randomStrategy = new RandomStrategy<String>(); + } + + @Test(expected = ResourceExhaustedException.class) + public void testNoBackends() throws ResourceExhaustedException { + control.replay(); + + randomStrategy.nextBackend(); + } + + @Test + public void testEmptyBackends() throws ResourceExhaustedException { + Capture<Collection<String>> capture = createCapture(); + onBackendsChosen.execute(capture(capture)); + control.replay(); + + randomStrategy.offerBackends(Sets.<String>newHashSet(), onBackendsChosen); + + try { + randomStrategy.nextBackend(); + fail("Expected ResourceExhaustedException to be thrown"); + } catch (ResourceExhaustedException e) { + // expected + } + + assertTrue(capture.hasCaptured()); + assertTrue(capture.getValue().isEmpty()); + } + + @Test + @SuppressWarnings("unchecked") // Needed because type information lost in vargs. + public void testRandomSelection() throws ResourceExhaustedException { + Capture<Collection<String>> capture = createCapture(); + onBackendsChosen.execute(capture(capture)); + control.replay(); + + HashSet<String> backends = Sets.newHashSet(BACKEND_1, BACKEND_2, BACKEND_3); + randomStrategy.offerBackends(backends, onBackendsChosen); + + assertTrue(ImmutableSet.of(BACKEND_1, BACKEND_2, BACKEND_3) + .contains(randomStrategy.nextBackend())); + assertTrue(capture.hasCaptured()); + assertEquals(backends, Sets.newHashSet(capture.getValue())); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/loadbalancing/RoundRobinStrategyTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/loadbalancing/RoundRobinStrategyTest.java b/commons/src/test/java/com/twitter/common/net/loadbalancing/RoundRobinStrategyTest.java new file mode 100644 index 0000000..f58addb --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/loadbalancing/RoundRobinStrategyTest.java @@ -0,0 +1,107 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.loadbalancing; + +import com.google.common.collect.Sets; +import com.twitter.common.base.Closure; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.net.pool.ResourceExhaustedException; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.easymock.EasyMock.capture; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author William Farner + */ +public class RoundRobinStrategyTest extends EasyMockTest { + + private static final String BACKEND_1 = "backend1"; + private static final String BACKEND_2 = "backend2"; + private static final String BACKEND_3 = "backend3"; + + private Closure<Collection<String>> onBackendsChosen; + + private LoadBalancingStrategy<String> roundRobin; + + @Before + public void setUp() { + onBackendsChosen = createMock(new Clazz<Closure<Collection<String>>>() {}); + + roundRobin = new RoundRobinStrategy<String>(); + } + + @Test(expected = ResourceExhaustedException.class) + public void testNoBackends() throws ResourceExhaustedException { + control.replay(); + + roundRobin.nextBackend(); + } + + @Test + public void testEmptyBackends() throws ResourceExhaustedException { + Capture<Collection<String>> capture = createCapture(); + onBackendsChosen.execute(capture(capture)); + control.replay(); + + roundRobin.offerBackends(Sets.<String>newHashSet(), onBackendsChosen); + + try { + roundRobin.nextBackend(); + fail("Expected ResourceExhaustedException to be thrown"); + } catch (ResourceExhaustedException e) { + // expected + } + + assertTrue(capture.hasCaptured()); + assertTrue(capture.getValue().isEmpty()); + } + + @Test + public void testConsistentOrdering() throws ResourceExhaustedException { + Capture<Collection<String>> capture = createCapture(); + onBackendsChosen.execute(capture(capture)); + control.replay(); + + HashSet<String> backends = Sets.newHashSet(BACKEND_1, BACKEND_2, BACKEND_3); + roundRobin.offerBackends(backends, onBackendsChosen); + Set<String> iteration1 = Sets.newHashSet( + roundRobin.nextBackend(), + roundRobin.nextBackend(), + roundRobin.nextBackend() + ); + Set<String> iteration2 = Sets.newHashSet( + roundRobin.nextBackend(), + roundRobin.nextBackend(), + roundRobin.nextBackend() + ); + + assertThat(iteration2, is(iteration1)); + assertTrue(capture.hasCaptured()); + assertEquals(backends, Sets.newHashSet(capture.getValue())); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/loadbalancing/SubsetStrategyTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/loadbalancing/SubsetStrategyTest.java b/commons/src/test/java/com/twitter/common/net/loadbalancing/SubsetStrategyTest.java new file mode 100644 index 0000000..e879459 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/loadbalancing/SubsetStrategyTest.java @@ -0,0 +1,101 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.loadbalancing; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.twitter.common.base.Closure; +import com.twitter.common.net.pool.ResourceExhaustedException; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import com.twitter.common.net.loadbalancing.RequestTracker.RequestResult; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Set; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author William Farner + */ +public class SubsetStrategyTest extends EasyMockTest { + + private static final String BACKEND_1 = "backend1"; + private static final String BACKEND_2 = "backend2"; + private static final String BACKEND_3 = "backend3"; + + private Closure<Collection<String>> onBackendsChosen; + private LoadBalancingStrategy<String> wrappedStrategy; + + private LoadBalancingStrategy<String> subsetStrategy; + + @Before + public void setUp() { + wrappedStrategy = createMock(new Clazz<LoadBalancingStrategy<String>>() {}); + onBackendsChosen = createMock(new Clazz<Closure<Collection<String>>>() {}); + + subsetStrategy = new SubsetStrategy<String>(2, wrappedStrategy); + } + + @Test(expected = ResourceExhaustedException.class) + public void testNoBackends() throws ResourceExhaustedException { + expect(wrappedStrategy.nextBackend()).andThrow(new ResourceExhaustedException("No backends.")); + + control.replay(); + + subsetStrategy.nextBackend(); + } + + @Test + public void testForwardsSubsetBackends() { + Capture<Set<String>> backendCapture = createCapture(); + wrappedStrategy.offerBackends(capture(backendCapture), eq(onBackendsChosen)); + control.replay(); + + subsetStrategy.offerBackends(Sets.newHashSet(BACKEND_1, BACKEND_2, BACKEND_3), + onBackendsChosen); + + assertThat(backendCapture.getValue().size(), is(2)); + } + + @Test + public void testForwardsOnlySubsetRequests() { + Capture<Set<String>> backendCapture = createCapture(); + wrappedStrategy.offerBackends(capture(backendCapture), eq(onBackendsChosen)); + + control.replay(); + + Set<String> allBackends = Sets.newHashSet(BACKEND_1, BACKEND_2, BACKEND_3); + subsetStrategy.offerBackends(allBackends, onBackendsChosen); + Set<String> backends = backendCapture.getValue(); + assertThat(backends.size(), is(2)); + + // One backend should have been unused, makes sure the appropriate calls are ignored for it. + String unusedBackend = Iterables.getOnlyElement(Sets.difference(allBackends, backends)); + subsetStrategy.addRequestResult(unusedBackend, RequestResult.SUCCESS, 0L); + subsetStrategy.addConnectResult(unusedBackend, ConnectionResult.FAILED, 0L); + subsetStrategy.connectionReturned(unusedBackend); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/monitoring/TrafficMonitorTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/monitoring/TrafficMonitorTest.java b/commons/src/test/java/com/twitter/common/net/monitoring/TrafficMonitorTest.java new file mode 100644 index 0000000..c4eb18d --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/monitoring/TrafficMonitorTest.java @@ -0,0 +1,125 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.monitoring; + +import com.twitter.common.net.loadbalancing.RequestTracker; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.testing.FakeClock; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +/** + * @author William Farner + */ +public class TrafficMonitorTest { + + private final String HOST_A = "hostA"; + private final String HOST_B = "hostB"; + + private FakeClock clock; + private TrafficMonitor<String> monitor; + + @Before + public void setUp() { + clock = new FakeClock(); + monitor = new TrafficMonitor<String>("test service", clock); + } + + @Test + public void testBasicFlow() { + monitor.connected(HOST_A); + addSuccess(HOST_A); + + verifyConnections(HOST_A, 1); + verifyRequests(HOST_A, 1); + + monitor.released(HOST_A); + + verifyConnections(HOST_A, 0); + verifyRequests(HOST_A, 1); + verifyLifetimeRequests(1); + } + + @Test + public void testOutOfOrder() { + addSuccess(HOST_A); + monitor.connected(HOST_A); + + verifyConnections(HOST_A, 1); + verifyRequests(HOST_A, 1); + verifyLifetimeRequests(1); + } + + @Test + public void testEntriesExpire() { + monitor.connected(HOST_A); + addSuccess(HOST_A); + monitor.released(HOST_A); + + verifyConnections(HOST_A, 0); + verifyRequests(HOST_A, 1); + + monitor.connected(HOST_B); + addSuccess(HOST_B); + + verifyConnections(HOST_B, 1); + verifyRequests(HOST_B, 1); + + // Fake + clock.advance(Amount.of(TrafficMonitor.DEFAULT_GC_INTERVAL.as(Time.SECONDS) + 1, Time.SECONDS)); + monitor.gc(); + + verifyConnections(HOST_A, 0); + verifyRequests(HOST_A, 0); + verifyConnections(HOST_B, 1); + verifyRequests(HOST_B, 1); + verifyLifetimeRequests(2); + } + + private void addSuccess(String host) { + monitor.requestResult(host, RequestTracker.RequestResult.SUCCESS, 0L); + } + + private void verifyConnections(String host, int count) { + TrafficMonitor<String>.TrafficInfo info = monitor.getTrafficInfo().get(host); + + if (count > 0) assertNotNull(info); + + if (info != null) { + assertThat(monitor.getTrafficInfo().get(host).getConnectionCount(), is(count)); + } + } + + private void verifyRequests(String host, int count) { + TrafficMonitor<String>.TrafficInfo info = monitor.getTrafficInfo().get(host); + + if (count > 0) assertNotNull(info); + + if (info != null) { + assertThat(monitor.getTrafficInfo().get(host).getRequestSuccessCount(), is(count)); + } + } + + private void verifyLifetimeRequests(long count) { + assertThat(monitor.getLifetimeRequestCount(), is(count)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/pool/ConnectionPoolTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/pool/ConnectionPoolTest.java b/commons/src/test/java/com/twitter/common/net/pool/ConnectionPoolTest.java new file mode 100644 index 0000000..4eb575f --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/pool/ConnectionPoolTest.java @@ -0,0 +1,258 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.Stats; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.*; + +/** + * @author John Sirois + */ +public class ConnectionPoolTest { + private IMocksControl control; + private ConnectionFactory<Connection<String, Integer>> connectionFactory; + private ReentrantLock poolLock; + + @Before public void setUp() throws Exception { + control = EasyMock.createControl(); + + @SuppressWarnings("unchecked") ConnectionFactory<Connection<String, Integer>> connectionFactory = + control.createMock(ConnectionFactory.class); + this.connectionFactory = connectionFactory; + + poolLock = new ReentrantLock(); + } + + @Test(expected = IllegalArgumentException.class) + public void testReleaseUnmanaged() { + @SuppressWarnings("unchecked") + Connection<String, Integer> connection = control.createMock(Connection.class); + + Executor executor = createMockExecutor(); + control.replay(); + + try { + createConnectionPool(executor).release(connection); + } finally { + control.verify(); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testReleaseUnmanagedIdentity() throws Exception { + class TestConnection implements Connection<String, Integer> { + @Override public String get() { + return "test"; + } + + @Override public boolean isValid() { + return true; + } + + @Override public void close() { + // noop + } + + @Override public Integer getEndpoint() { + return 1; + } + + @Override public boolean equals(Object obj) { + return obj instanceof TestConnection; + } + } + + Executor executor = createMockExecutor(); + + TestConnection connection = new TestConnection(); + expect(connectionFactory.create(eq(ObjectPool.NO_TIMEOUT))).andReturn(connection); + + control.replay(); + + ConnectionPool<Connection<String, Integer>> connectionPool = createConnectionPool(executor); + assertSame(connection, connectionPool.get()); + + TestConnection equalConnection = new TestConnection(); + assertEquals(equalConnection, connection); + try { + connectionPool.release(equalConnection); + } finally { + control.verify(); + } + } + + @Test(expected = ResourceExhaustedException.class) + public void testExhaustedNull() throws Exception { + Executor executor = createMockExecutor(); + expect(connectionFactory.create(eq(ObjectPool.NO_TIMEOUT))).andReturn(null); + control.replay(); + + try { + createConnectionPool(executor).get(); + } finally { + control.verify(); + } + } + + @Test(expected = TimeoutException.class) + public void testExhaustedWillNot() throws Exception { + Executor executor = createMockExecutor(); + + @SuppressWarnings("unchecked") + Connection<String, Integer> connection = control.createMock(Connection.class); + expect(connectionFactory.create(eq(ObjectPool.NO_TIMEOUT))).andReturn(connection); + + expect(connectionFactory.mightCreate()).andReturn(false); + control.replay(); + + ConnectionPool<Connection<String, Integer>> connectionPool = createConnectionPool(executor); + assertSame(connection, connectionPool.get()); + + try { + connectionPool.get(Amount.of(1L, Time.NANOSECONDS)); + } finally { + control.verify(); + } + } + + @Test + public void testCloseDisallowsGets() throws Exception { + Executor executor = createMockExecutor(); + control.replay(); + + ConnectionPool<Connection<String, Integer>> connectionPool = createConnectionPool(executor); + connectionPool.close(); + + try { + connectionPool.get(); + fail(); + } catch (IllegalStateException e) { + // expected + } + + try { + connectionPool.get(Amount.of(1L, Time.MILLISECONDS)); + fail(); + } catch (IllegalStateException e) { + // expected + } + + control.verify(); + } + + @Test + public void testCloseCloses() throws Exception { + Executor executor = Executors.newSingleThreadExecutor(); + + @SuppressWarnings("unchecked") + Connection<String, Integer> connection = control.createMock(Connection.class); + expect(connectionFactory.create(eq(ObjectPool.NO_TIMEOUT))).andReturn(connection); + + @SuppressWarnings("unchecked") + Connection<String, Integer> connection2 = control.createMock(Connection.class); + expect(connectionFactory.create(eq(ObjectPool.NO_TIMEOUT))).andReturn(connection2); + expect(connectionFactory.mightCreate()).andReturn(true); + + connectionFactory.destroy(connection2); + expect(connection2.isValid()).andReturn(true); + + control.replay(); + + ConnectionPool<Connection<String, Integer>> connectionPool = createConnectionPool(executor); + + // This 1st connection is leased out of the pool at close-time and so should not be touched + Connection<String, Integer> leasedDuringClose = connectionPool.get(); + + // this 2nd connection is available when close is called so it should be destroyed + connectionPool.release(connectionPool.get()); + connectionPool.close(); + + control.verify(); + control.reset(); + + connectionFactory.destroy(connection); + + control.replay(); + + // After a close, releases should destroy connections + connectionPool.release(leasedDuringClose); + + control.verify(); + } + + @Test + public void testCreating() throws Exception { + Amount<Long, Time> timeout = Amount.of(1L, Time.SECONDS); + + Executor executor = + new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>()); + + expect(connectionFactory.mightCreate()).andReturn(true); + + Capture<Amount<Long, Time>> timeout1 = new Capture<Amount<Long, Time>>(); + @SuppressWarnings("unchecked") + Connection<String, Integer> connection1 = control.createMock(Connection.class); + expect(connectionFactory.create(capture(timeout1))).andReturn(connection1); + + Capture<Amount<Long, Time>> timeout2 = new Capture<Amount<Long, Time>>(); + @SuppressWarnings("unchecked") + Connection<String, Integer> connection2 = control.createMock(Connection.class); + expect(connectionFactory.create(capture(timeout2))).andReturn(connection2); + + control.replay(); + + ConnectionPool<Connection<String, Integer>> connectionPool = createConnectionPool(executor); + + assertSame(connection1, connectionPool.get(timeout)); + assertTrue(timeout1.hasCaptured()); + Long timeout1Millis = timeout1.getValue().as(Time.MILLISECONDS); + assertTrue(timeout1Millis > 0 && timeout1Millis <= timeout.as(Time.MILLISECONDS)); + + assertSame(connection2, connectionPool.get(timeout)); + assertTrue(timeout2.hasCaptured()); + Long timeout2Millis = timeout1.getValue().as(Time.MILLISECONDS); + assertTrue(timeout2Millis > 0 && timeout2Millis <= timeout.as(Time.MILLISECONDS)); + + control.verify(); + } + + private Executor createMockExecutor() { + return control.createMock(Executor.class); + } + + private ConnectionPool<Connection<String, Integer>> createConnectionPool(Executor executor) { + return new ConnectionPool<Connection<String, Integer>>(executor, poolLock, + connectionFactory, Stats.STATS_PROVIDER); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/pool/DynamicHostSetUtilTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/pool/DynamicHostSetUtilTest.java b/commons/src/test/java/com/twitter/common/net/pool/DynamicHostSetUtilTest.java new file mode 100644 index 0000000..d7c71c3 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/pool/DynamicHostSetUtilTest.java @@ -0,0 +1,61 @@ +// ================================================================================================= +// Copyright 2014 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.collect.ImmutableSet; + +import org.easymock.Capture; +import org.easymock.IAnswer; +import org.junit.Test; + +import com.twitter.common.base.Command; +import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor; +import com.twitter.common.net.pool.DynamicHostSet.MonitorException; +import com.twitter.common.testing.easymock.EasyMockTest; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; + +public class DynamicHostSetUtilTest extends EasyMockTest { + + @Test + public void testSnapshot() throws MonitorException { + DynamicHostSet<String> hostSet = createMock(new Clazz<DynamicHostSet<String>>() { }); + final Capture<HostChangeMonitor<String>> monitorCapture = createCapture(); + final Command unwatchCommand = createMock(Command.class); + + expect(hostSet.watch(capture(monitorCapture))).andAnswer(new IAnswer<Command>() { + @Override public Command answer() throws Throwable { + // Simulate the 1st blocking onChange callback. + HostChangeMonitor<String> monitor = monitorCapture.getValue(); + monitor.onChange(ImmutableSet.of("jack", "jill")); + return unwatchCommand; + } + }); + + // Confirm we clean up our watch. + unwatchCommand.execute(); + expectLastCall(); + + control.replay(); + + ImmutableSet<String> snapshot = DynamicHostSetUtil.getSnapshot(hostSet); + assertEquals(ImmutableSet.of("jack", "jill"), snapshot); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/net/pool/DynamicPoolTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/net/pool/DynamicPoolTest.java b/commons/src/test/java/com/twitter/common/net/pool/DynamicPoolTest.java new file mode 100644 index 0000000..aa94e28 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/net/pool/DynamicPoolTest.java @@ -0,0 +1,171 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.twitter.common.base.Closure; +import com.twitter.common.base.Closures; +import com.twitter.common.base.Function; +import com.twitter.common.collections.Pair; +import com.twitter.common.thrift.testing.MockTSocket; +import com.twitter.common.net.loadbalancing.LoadBalancerImpl; +import com.twitter.common.net.loadbalancing.RandomStrategy; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.thrift.TTransportConnection; +import com.twitter.common.thrift.Util; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.ServerSet; +import com.twitter.common.zookeeper.ServerSet.EndpointStatus; +import com.twitter.common.zookeeper.ServerSetImpl; +import com.twitter.common.zookeeper.testing.BaseZooKeeperTest; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; +import org.apache.thrift.transport.TTransport; +import org.apache.zookeeper.ZooDefs; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; + +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.*; + +/** + * @author John Sirois + */ +public class DynamicPoolTest extends BaseZooKeeperTest { + + private IMocksControl control; + private Function<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>> + poolFactory; + private DynamicPool connectionPool; + private LinkedBlockingQueue<Pair<Set<ObjectPool<Connection<TTransport, InetSocketAddress>>>, + Map<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>>> poolRebuilds; + + private ServerSet serverSet; + + @Before + public void mySetUp() throws Exception { + control = createControl(); + + @SuppressWarnings("unchecked") + Function<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>> poolFactory = + control.createMock(Function.class); + this.poolFactory = poolFactory; + + LoadBalancerImpl<InetSocketAddress> lb = + LoadBalancerImpl.create(new RandomStrategy<InetSocketAddress>()); + + poolRebuilds = + new LinkedBlockingQueue<Pair<Set<ObjectPool<Connection<TTransport, InetSocketAddress>>>, + Map<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>>>(); + serverSet = new ServerSetImpl(createZkClient(), ZooDefs.Ids.OPEN_ACL_UNSAFE, "/test-service"); + Closure<Collection<InetSocketAddress>> onBackendsChosen = Closures.noop(); + Amount<Long, Time> restoreInterval = Amount.of(1L, Time.MINUTES); + connectionPool = new DynamicPool<ServiceInstance, TTransport, InetSocketAddress>( + serverSet, poolFactory, lb, onBackendsChosen, restoreInterval, Util.GET_ADDRESS, + Util.IS_ALIVE) { + @Override + void poolRebuilt(Set<ObjectPool<Connection<TTransport, InetSocketAddress>>> deadPools, + Map<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>> livePools) { + super.poolRebuilt(deadPools, livePools); + poolRebuilds.offer(Pair.of(deadPools, livePools)); + } + }; + } + + @Test + public void testConstructionBlocksOnInitialPoolBuild() { + assertNotNull(Iterables.getOnlyElement(poolRebuilds)); + } + + @Test(expected = ResourceExhaustedException.class) + public void testNoEndpointsAvailable() throws Exception { + connectionPool.get(); + } + + private EndpointStatus join(String host) throws JoinException, InterruptedException { + return serverSet.join( + InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of()); + } + + @Test + @SuppressWarnings("unchecked") + public void testPoolRebuilds() throws Exception { + ConnectionFactory<Connection<TTransport, InetSocketAddress>> connectionFactory = + control.createMock(ConnectionFactory.class); + + TTransport transport = new MockTSocket(); + Connection<TTransport, InetSocketAddress> connection = + new TTransportConnection(transport, InetSocketAddress.createUnresolved("jake", 1137)); + + expect(connectionFactory.create(EasyMock.isA(Amount.class))).andReturn(connection); + ConnectionPool<Connection<TTransport, InetSocketAddress>> fooPool = + new ConnectionPool<Connection<TTransport, InetSocketAddress>>(connectionFactory); + expect(poolFactory.apply(InetSocketAddress.createUnresolved("foo", 42))).andReturn(fooPool); + + control.replay(); + + Pair<Set<ObjectPool<Connection<TTransport, InetSocketAddress>>>, + Map<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>> + rebuild1 = poolRebuilds.take(); + assertTrue("Should not have any dead pools on initial rebuild", rebuild1.getFirst().isEmpty()); + assertNoLivePools(rebuild1); + + EndpointStatus fooStatus = join("foo"); + + Pair<Set<ObjectPool<Connection<TTransport, InetSocketAddress>>>, + Map<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>> + rebuild2 = poolRebuilds.take(); + assertTrue("The NULL pool should never be tracked as dead", rebuild2.getFirst().isEmpty()); + assertEquals(transport, connectionPool.get().get()); + + fooStatus.leave(); + + Pair<Set<ObjectPool<Connection<TTransport, InetSocketAddress>>>, + Map<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>> + rebuild3 = poolRebuilds.take(); + assertSame("Expected foo pool to be discarded", fooPool, + Iterables.getOnlyElement(rebuild3.getFirst())); + assertNoLivePools(rebuild1); + + control.verify(); + } + + private void assertNoLivePools(Pair<Set<ObjectPool<Connection<TTransport, InetSocketAddress>>>, + Map<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>> rebuild) + throws TimeoutException { + + assertTrue("Expected no live pools to be set", rebuild.getSecond().isEmpty()); + try { + connectionPool.get(); + fail("Expected server set to be exhausted with no endpoints"); + } catch (ResourceExhaustedException e) { + // expected + } + } +}
