http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java new file mode 100644 index 0000000..45faf5d --- /dev/null +++ b/commons/src/test/java/com/twitter/common/stats/WindowedHistogramTest.java @@ -0,0 +1,237 @@ +// ================================================================================================= +// Copyright 2013 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import java.util.List; + +import com.google.common.collect.ImmutableList; + +import org.junit.Test; + +import com.twitter.common.objectsize.ObjectSizeCalculator; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Data; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.testing.RealHistogram; +import com.twitter.common.util.testing.FakeClock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import static com.twitter.common.stats.WindowedApproxHistogram.DEFAULT_MAX_MEMORY; + +/** + * Tests WindowedHistogram. + */ +public class WindowedHistogramTest { + + @Test + public void testEmptyWinHistogram() { + WindowedApproxHistogram wh = new WindowedApproxHistogram(); + assertEquals(0L, wh.getQuantile(0.0)); + } + + @Test + public void testWinHistogramWithEdgeCases() { + FakeClock clock = new FakeClock(); + Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS); + int slices = 10; + long sliceDuration = window.as(Time.NANOSECONDS) / slices; + WindowedApproxHistogram h = + new WindowedApproxHistogram(window, slices, DEFAULT_MAX_MEMORY, clock); + + h.add(Long.MIN_VALUE); + clock.advance(Amount.of(2 * sliceDuration, Time.NANOSECONDS)); + assertEquals(Long.MIN_VALUE, h.getQuantile(0.0)); + assertEquals(Long.MIN_VALUE, h.getQuantile(0.5)); + assertEquals(Long.MIN_VALUE, h.getQuantile(1.0)); + + h.add(Long.MAX_VALUE); + clock.advance(Amount.of(2 * sliceDuration, Time.NANOSECONDS)); + assertEquals(Long.MIN_VALUE, h.getQuantile(0.0)); + assertEquals(Long.MIN_VALUE, h.getQuantile(0.25)); + assertEquals(Long.MAX_VALUE, h.getQuantile(0.75)); + assertEquals(Long.MAX_VALUE, h.getQuantile(1.0)); + } + + @Test + public void testClearedWinHistogram() { + FakeClock clock = new FakeClock(); + Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS); + int slices = 10; + Amount<Long, Time> sliceDuration = Amount.of( + window.as(Time.NANOSECONDS) / slices, Time.NANOSECONDS); + WindowedHistogram<?> h = createFullHistogram(window, slices, clock); + long p0 = h.getQuantile(0.1); + long p50 = h.getQuantile(0.5); + long p90 = h.getQuantile(0.9); + assertFalse(0 == p0); + assertFalse(0 == p50); + assertFalse(0 == p90); + + h.clear(); + + assertEquals(0, h.getQuantile(0.1)); + assertEquals(0, h.getQuantile(0.5)); + assertEquals(0, h.getQuantile(0.9)); + + // reload the histogram with the exact same values than before + fillHistogram(h, sliceDuration, slices, clock); + + assertEquals(p0, h.getQuantile(0.1)); + assertEquals(p50, h.getQuantile(0.5)); + assertEquals(p90, h.getQuantile(0.9)); + } + + @Test + public void testSimpleWinHistogram() { + FakeClock clock = new FakeClock(); + Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS); + int slices = 10; + WindowedHistogram<?> wh = createFullHistogram(window, slices, clock); + + // check that the global distribution is the aggregation of all underlying histograms + for (int i = 1; i <= slices; i++) { + double q = (double) i / slices; + assertEquals(i, wh.getQuantile(q), 1.0); + } + + // advance in time an forget about old values + long sliceDuration = window.as(Time.NANOSECONDS) / slices; + clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS)); + for (int j = 0; j < 1000; j++) { + wh.add(11); + } + assertEquals(2, wh.getQuantile(0.05), 1.0); + assertEquals(11, wh.getQuantile(0.99), 1.0); + } + + @Test + public void testWinHistogramWithGap() { + FakeClock clock = new FakeClock(); + Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS); + int slices = 10; + WindowedHistogram<?> wh = createFullHistogram(window, slices, clock); + // wh is a WindowedHistogram of 10 slices + the empty current with values from 1 to 10 + // [1][2][3][4][5][6][7][8][9][10][.] + // ^ + + for (int j = 0; j < 1000; j++) { + wh.add(100); + } + // [1][2][3][4][5][6][7][8][9][10][100] + // ^ + // quantiles are computed based on [1] -> [10] + + clock.advance(Amount.of((slices - 1) * 100L / slices, Time.MILLISECONDS)); + for (int j = 0; j < 1000; j++) { + wh.add(200); + } + // [1][2][3][4][5][6][7][8][200][10][100] + // ^ + // quantiles are computed based on [10][100][1][2][3][4][5][6][7][8] + // and removing old ones [10][100][.][.][.][.][.][.][.][.] + // all the histograms between 100 and 200 are old and shouldn't matter in the computation of + // quantiles. + assertEquals(10L, wh.getQuantile(0.25), 1.0); + assertEquals(100L, wh.getQuantile(0.75), 1.0); + + clock.advance(Amount.of(100L / slices, Time.MILLISECONDS)); + // [1][2][3][4][5][6][7][8][200][10][100] + // ^ + // quantiles are computed based on [100][1][2][3][4][5][6][7][8][200] + // and removing old ones [100][.][.][.][.][.][.][.][.][200] + + assertEquals(100L, wh.getQuantile(0.25), 1.0); + assertEquals(200L, wh.getQuantile(0.75), 1.0); + + // advance a lot in time, everything should be "forgotten" + clock.advance(Amount.of(500L, Time.MILLISECONDS)); + assertEquals(0L, wh.getQuantile(0.5), 1.0); + } + + @Test + public void testWinHistogramMemory() { + ImmutableList.Builder<Amount<Long, Data>> builder = ImmutableList.builder(); + builder.add(Amount.of(8L, Data.KB)); + builder.add(Amount.of(12L, Data.KB)); + builder.add(Amount.of(16L, Data.KB)); + builder.add(Amount.of(20L, Data.KB)); + builder.add(Amount.of(24L, Data.KB)); + builder.add(Amount.of(32L, Data.KB)); + builder.add(Amount.of(64L, Data.KB)); + builder.add(Amount.of(256L, Data.KB)); + builder.add(Amount.of(1L, Data.MB)); + builder.add(Amount.of(16L, Data.MB)); + builder.add(Amount.of(32L, Data.MB)); + List<Amount<Long, Data>> sizes = builder.build(); + + // large estimation of the memory used outside of buffers + long fixSize = Amount.of(4, Data.KB).as(Data.BYTES); + + for (Amount<Long, Data> maxSize: sizes) { + WindowedApproxHistogram hist = new WindowedApproxHistogram( + Amount.of(60L, Time.SECONDS), 6, maxSize); + hist.add(1L); + hist.getQuantile(0.5); + long size = ObjectSizeCalculator.getObjectSize(hist); + // reverting CI JVM seems to have different memory consumption than mine + //assertTrue(size < fixSize + maxSize.as(Data.BYTES)); + } + } + + @Test + public void testWinHistogramAccuracy() { + FakeClock ticker = new FakeClock(); + Amount<Long, Time> window = Amount.of(100L, Time.MILLISECONDS); + int slices = 10; + Amount<Long, Time> sliceDuration = Amount.of( + window.as(Time.NANOSECONDS) / slices, Time.NANOSECONDS); + WindowedHistogram<?> wh = createFullHistogram(window, slices, ticker); + RealHistogram rh = fillHistogram(new RealHistogram(), sliceDuration, slices, new FakeClock()); + + assertEquals(wh.getQuantile(0.5), rh.getQuantile(0.5)); + assertEquals(wh.getQuantile(0.75), rh.getQuantile(0.75)); + assertEquals(wh.getQuantile(0.9), rh.getQuantile(0.9)); + assertEquals(wh.getQuantile(0.99), rh.getQuantile(0.99)); + } + + /** + * @return a WindowedHistogram with different value in each underlying Histogram + */ + private WindowedHistogram<?> createFullHistogram( + Amount<Long, Time> duration, int slices, FakeClock clock) { + long sliceDuration = duration.as(Time.NANOSECONDS) / slices; + WindowedApproxHistogram wh = new WindowedApproxHistogram(duration, slices, + DEFAULT_MAX_MEMORY, clock); + clock.advance(Amount.of(1L, Time.NANOSECONDS)); + + return fillHistogram(wh, Amount.of(sliceDuration, Time.NANOSECONDS), slices, clock); + } + + private <H extends Histogram> H fillHistogram(H h, + Amount<Long, Time> sliceDuration, int slices, FakeClock clock) { + for (int i = 1; i <= slices; i++) { + for (int j = 0; j < 1000; j++) { + h.add(i); + } + clock.advance(sliceDuration); + } + return h; + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java new file mode 100644 index 0000000..ed821ea --- /dev/null +++ b/commons/src/test/java/com/twitter/common/stats/WindowedStatsTest.java @@ -0,0 +1,189 @@ +package com.twitter.common.stats; + +import org.junit.Test; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.WindowedStatistics; +import com.twitter.common.util.testing.FakeClock; + +import static org.junit.Assert.assertEquals; + +public class WindowedStatsTest { + private Amount<Long, Time> window = Amount.of(1L, Time.MINUTES); + private int slices = 3; + private long sliceDuration = window.as(Time.NANOSECONDS) / slices; + + @Test + public void testEmptyStats() { + FakeClock clock = new FakeClock(); + WindowedStatistics ws = new WindowedStatistics(window, slices, clock); + + assertEmpty(ws); + } + + @Test + public void testStatsCorrectness() { + FakeClock clock = new FakeClock(); + Statistics reference = new Statistics(); + WindowedStatistics ws = new WindowedStatistics(window, slices, clock); + + for (int i=0; i<1000; i++) { + reference.accumulate(i); + ws.accumulate(i); + } + clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + + assertEquals(reference.max(), ws.max()); + assertEquals(reference.min(), ws.min()); + assertEquals(reference.populationSize(), ws.populationSize()); + assertEquals(reference.sum(), ws.sum()); + assertEquals(reference.range(), ws.range()); + assertEquals(reference.mean(), ws.mean(), 0.01); + assertEquals(reference.variance(), ws.variance(), 0.01); + assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01); + + for (int i=0; i<1000; i++) { + long x = i + 500; + reference.accumulate(x); + ws.accumulate(x); + } + clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + + assertEquals(reference.max(), ws.max()); + assertEquals(reference.min(), ws.min()); + assertEquals(reference.populationSize(), ws.populationSize()); + assertEquals(reference.sum(), ws.sum()); + assertEquals(reference.range(), ws.range()); + assertEquals(reference.mean(), ws.mean(), 0.01); + assertEquals(reference.variance(), ws.variance(), 0.01); + assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01); + + for (int i=0; i<1000; i++) { + long x = i * i; + reference.accumulate(x); + ws.accumulate(x); + } + clock.advance(Amount.of(sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + + assertEquals(reference.max(), ws.max()); + assertEquals(reference.min(), ws.min()); + assertEquals(reference.populationSize(), ws.populationSize()); + assertEquals(reference.sum(), ws.sum()); + assertEquals(reference.range(), ws.range()); + assertEquals(reference.mean(), ws.mean(), 0.01); + assertEquals(reference.variance(), ws.variance(), 0.01); + assertEquals(reference.standardDeviation(), ws.standardDeviation(), 0.01); + } + + @Test + public void testWindowStats() { + FakeClock clock = new FakeClock(); + WindowedStatistics ws = new WindowedStatistics(window, slices, clock); + ws.accumulate(1L); + assertEmpty(ws); + + clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + + assertEquals(1L, ws.max()); + assertEquals(1L, ws.min()); + assertEquals(1L, ws.populationSize()); + assertEquals(1L, ws.sum()); + assertEquals(1.0, ws.mean(), 0.01); + assertEquals(0.0, ws.standardDeviation(), 0.01); + + clock.advance(Amount.of(slices * sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + assertEmpty(ws); + } + + @Test + public void testCleaningOfExpiredWindows() { + FakeClock clock = new FakeClock(); + WindowedStatistics ws = new WindowedStatistics(window, slices, clock); + + long n = 1000L; + for (int i=0; i<n; i++) { + ws.accumulate(i); + } + assertEmpty(ws); + + clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + assertEquals(n, ws.populationSize()); // this window is not empty + + clock.advance(Amount.of(100 * sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + assertEmpty(ws); // this window has been cleaned + } + + @Test + public void testAddNewValueToFullWS() { + FakeClock clock = new FakeClock(); + WindowedStatistics ws = new WindowedStatistics(window, slices, clock); + + // AAAAA + // BBBBB + // CCCCC + // DDDDD + // | | | | + //---------------------------------> t + // t=0 t=1 t=2 t=3 + + // t=0 fill {D} + long n = 1000L; + for (int i=0; i<n; i++) { + ws.accumulate(i); + } + // read {A,B,C}, which should be empty + assertEmpty(ws); + + clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + // t=1, read {B,C,D} which shouldn't be empty + + assertEquals(n - 1L, ws.max()); + assertEquals(0L, ws.min()); + assertEquals(n, ws.populationSize()); + assertEquals(n * (n - 1) / 2, ws.sum()); + assertEquals((n - 1) / 2.0, ws.mean(), 0.01); + + clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + // t=2, read {C,D,A} which shouldn't be empty as well + + assertEquals(n - 1L, ws.max()); + assertEquals(0L, ws.min()); + assertEquals(n, ws.populationSize()); + assertEquals(n * (n - 1) / 2, ws.sum()); + assertEquals((n - 1) / 2.0, ws.mean(), 0.01); + + clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + // t=3, read {D,A,B} which shouldn't be empty as well + + assertEquals(n - 1L, ws.max()); + assertEquals(0L, ws.min()); + assertEquals(n, ws.populationSize()); + assertEquals(n * (n - 1) / 2, ws.sum()); + assertEquals((n - 1) / 2.0, ws.mean(), 0.01); + + clock.advance(Amount.of(1 + sliceDuration, Time.NANOSECONDS)); + ws.refresh(); + // t=4, read {A,B,C} which must be empty (cleaned by the Windowed class) + assertEmpty(ws); + } + + private void assertEmpty(WindowedStatistics ws) { + assertEquals(Long.MIN_VALUE, ws.max()); + assertEquals(Long.MAX_VALUE, ws.min()); + assertEquals(0L, ws.populationSize()); + assertEquals(0L, ws.sum()); + assertEquals(0.0, ws.mean(), 0.01); + assertEquals(0.0, ws.standardDeviation(), 0.01); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/stats/WindowedTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/stats/WindowedTest.java b/commons/src/test/java/com/twitter/common/stats/WindowedTest.java new file mode 100644 index 0000000..17526ea --- /dev/null +++ b/commons/src/test/java/com/twitter/common/stats/WindowedTest.java @@ -0,0 +1,113 @@ +// ================================================================================================= +// Copyright 2013 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.stats; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; + +import org.junit.Test; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; +import com.twitter.common.util.testing.FakeClock; + +import junit.framework.Assert; + +/** + * Test the Windowed abstract class by making a very simple implementation. + */ +public class WindowedTest { + + private class WindowedBox extends Windowed<Integer[]> { + WindowedBox(Amount<Long, Time > window, int slices, Clock clock) { + super(Integer[].class, window, slices, + new Supplier<Integer[]>() { + @Override public Integer[] get() { + Integer[] box = new Integer[1]; + box[0] = 0; + return box; + } + }, + new Function<Integer[], Integer[]>() { + @Override public Integer[] apply(Integer[] xs) { + xs[0] = 0; + return xs; + } + }, clock); + } + + void increment() { + getCurrent()[0] += 1; + } + + int sum() { + int s = 0; + for (Integer[] box : getTenured()) { + s += box[0]; + } + return s; + } + } + + @Test + public void testWindowed() { + Amount<Long, Time > window = Amount.of(1L, Time.MINUTES); + int slices = 3; + Amount<Long, Time > delta = Amount.of( + Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS) / 3, Time.NANOSECONDS); + FakeClock clock = new FakeClock(); + WindowedBox win = new WindowedBox(window, slices, clock); + // [0][0][0][0] + clock.advance(Amount.of(1L, Time.NANOSECONDS)); + + win.increment(); + // [0][0][0][1] + Assert.assertEquals(0, win.sum()); + + clock.advance(delta); + win.increment(); + win.increment(); + Assert.assertEquals(1, win.sum()); + // [0][0][1][2] + + clock.advance(delta); + win.increment(); + win.increment(); + win.increment(); + Assert.assertEquals(3, win.sum()); + // [0][1][2][3] + + clock.advance(delta); + win.increment(); + win.increment(); + win.increment(); + win.increment(); + Assert.assertEquals(6, win.sum()); + // [1][2][3][4] + + clock.advance(delta); + win.increment(); + win.increment(); + win.increment(); + win.increment(); + win.increment(); + Assert.assertEquals(9, win.sum()); + // [2][3][4][5] + } + +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java b/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java new file mode 100644 index 0000000..56630fa --- /dev/null +++ b/commons/src/test/java/com/twitter/common/testing/TearDownRegistryTest.java @@ -0,0 +1,49 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.testing; + +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.testing.junit4.TearDownTestCase; + +import org.junit.Test; + +import com.twitter.common.base.Command; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author John Sirois + */ +public class TearDownRegistryTest extends TearDownTestCase { + + @Test + public void testTearDown() { + TearDownRegistry tearDownRegistry = new TearDownRegistry(this); + final AtomicBoolean actionExecuted = new AtomicBoolean(false); + tearDownRegistry.addAction(new Command() { + @Override public void execute() { + actionExecuted.set(true); + } + }); + + assertFalse(actionExecuted.get()); + tearDown(); + assertTrue(actionExecuted.get()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java b/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java new file mode 100644 index 0000000..8546935 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/testing/easymock/EasyMockTestTest.java @@ -0,0 +1,63 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.testing.easymock; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.collect.ImmutableList; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Test; + +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author John Sirois + */ +public class EasyMockTestTest extends EasyMockTest { + + @Test + public void testSimplyParametrizedMock() { + final AtomicBoolean ran = new AtomicBoolean(false); + + Runnable runnable = createMock(new Clazz<Runnable>() { }); + runnable.run(); + expectLastCall().andAnswer(new IAnswer<Void>() { + @Override public Void answer() { + ran.set(true); + return null; + } + }); + control.replay(); + + runnable.run(); + assertTrue(ran.get()); + } + + @Test + public void testNestedParametrizedMock() { + List<List<String>> list = createMock(new Clazz<List<List<String>>>() { }); + EasyMock.expect(list.get(0)).andReturn(ImmutableList.of("jake")); + control.replay(); + + assertEquals(ImmutableList.of("jake"), list.get(0)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java b/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java new file mode 100644 index 0000000..07a5aa9 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/testing/easymock/IterableEqualsTest.java @@ -0,0 +1,53 @@ +package com.twitter.common.testing.easymock; + +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.ImmutableList; + +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; + +import static com.twitter.common.testing.easymock.IterableEquals.eqCollection; +import static com.twitter.common.testing.easymock.IterableEquals.eqIterable; +import static com.twitter.common.testing.easymock.IterableEquals.eqList; + +public class IterableEqualsTest extends EasyMockTest { + private static final List<Integer> TEST = ImmutableList.of(1, 2, 3, 2); + private static final String OK = "ok"; + private Thing thing; + + public interface Thing { + String testIterable(Iterable<Integer> input); + String testCollection(Collection<Integer> input); + String testList(List<Integer> input); + } + + @Before + public void setUp() { + thing = createMock(Thing.class); + } + + @Test + public void testIterableEquals() { + expect(thing.testIterable(eqIterable(TEST))).andReturn(OK); + control.replay(); + thing.testIterable(ImmutableList.of(3, 2, 2, 1)); + } + + @Test + public void testCollectionEquals() { + expect(thing.testCollection(eqCollection(TEST))).andReturn(OK); + control.replay(); + thing.testCollection(ImmutableList.of(3, 2, 2, 1)); + } + + @Test + public void testListEquals() { + expect(thing.testList(eqList(TEST))).andReturn(OK); + control.replay(); + thing.testList(ImmutableList.of(3, 2, 2, 1)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java b/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java new file mode 100644 index 0000000..847347b --- /dev/null +++ b/commons/src/test/java/com/twitter/common/testing/junit/rules/RetryTest.java @@ -0,0 +1,233 @@ +// ================================================================================================= +// Copyright 2015 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.testing.junit.rules; + +import java.io.IOException; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import javax.annotation.Nullable; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.MethodRule; +import org.junit.runners.model.FrameworkMethod; +import org.junit.runners.model.Statement; + +// SUPPRESS CHECKSTYLE:OFF IllegalThrows +public class RetryTest { + + public abstract static class RetryTrackingTestBase { + private static int tries; + + @BeforeClass + public static void resetTries() { + tries = 0; + } + + enum Result { + FAILURE() { + @Override void execute() throws Throwable { + Assert.fail("Simulated assertion failure."); + } + }, + ERROR() { + @Override void execute() throws Throwable { + throw new IOException("Simulated unexpected error."); + } + }, + SUCCESS() { + @Override void execute() throws Throwable { + Assert.assertTrue("Simulated successful assertion.", true); + } + }; + + abstract void execute() throws Throwable; + } + + @Rule public Retry.Rule retry = new Retry.Rule(); + + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @interface AssertRetries { + int expectedTries(); + int expectedMaxRetries(); + Result expectedResult(); + } + + @Rule + public MethodRule testRetries = new MethodRule() { + @Override + public Statement apply(final Statement statement, FrameworkMethod method, Object receiver) { + final AssertRetries assertRetries = method.getAnnotation(AssertRetries.class); + Assert.assertNotNull(assertRetries); + return new Statement() { + @Override public void evaluate() throws Throwable { + try { + statement.evaluate(); + if (assertRetries.expectedResult() == Result.SUCCESS) { + Assert.assertEquals(assertRetries.expectedTries(), tries); + } else { + Assert.fail("Expected success, found " + assertRetries.expectedResult()); + } + } catch (Retry.Rule.RetriedAssertionError e) { + if (assertRetries.expectedResult() == Result.FAILURE) { + Assert.assertEquals(assertRetries.expectedTries(), tries); + Assert.assertEquals(assertRetries.expectedMaxRetries(), e.getMaxRetries()); + Assert.assertEquals(assertRetries.expectedTries(), e.getTryNumber()); + } else { + Assert.fail("Expected failure, found " + assertRetries.expectedResult()); + } + } catch (Retry.Rule.RetriedException e) { + if (assertRetries.expectedResult() == Result.ERROR) { + Assert.assertEquals(assertRetries.expectedTries(), tries); + Assert.assertEquals(assertRetries.expectedMaxRetries(), e.getMaxRetries()); + Assert.assertEquals(assertRetries.expectedTries(), e.getTryNumber()); + } else { + Assert.fail("Expected error, found " + assertRetries.expectedResult()); + } + } + } + }; + } + }; + + protected void doTest(int successfulTries) throws Throwable { + doTest(successfulTries, null); + } + + protected void doTest(int successfulTries, @Nullable Result lastResult) throws Throwable { + tries++; + if (lastResult != null && tries > successfulTries) { + lastResult.execute(); + } + } + } + + public static class DefaultRetrySuccessTest extends RetryTrackingTestBase { + @Test + @Retry + @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.SUCCESS) + public void test() throws Throwable { + doTest(2); + } + } + + public static class DefaultRetryFailFastTest extends RetryTrackingTestBase { + @Test + @Retry + @AssertRetries(expectedTries = 1, expectedMaxRetries = 1, expectedResult = Result.FAILURE) + public void test() throws Throwable { + doTest(0, Result.FAILURE); + } + } + + public static class DefaultRetryFailLastTest extends RetryTrackingTestBase { + @Test + @Retry + @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.FAILURE) + public void test() throws Throwable { + doTest(1, Result.FAILURE); + } + } + + public static class DefaultRetryErrorFastTest extends RetryTrackingTestBase { + @Test + @Retry + @AssertRetries(expectedTries = 1, expectedMaxRetries = 1, expectedResult = Result.ERROR) + public void test() throws Throwable { + doTest(0, Result.ERROR); + } + } + + public static class DefaultRetryErrorLastTest extends RetryTrackingTestBase { + @Test + @Retry + @AssertRetries(expectedTries = 2, expectedMaxRetries = 1, expectedResult = Result.ERROR) + public void test() throws Throwable { + doTest(1, Result.ERROR); + } + } + + public static class ZeroRetrySuccessTest extends RetryTrackingTestBase { + @Test + @Retry(times = 0) + @AssertRetries(expectedTries = 1, expectedMaxRetries = 0, expectedResult = Result.SUCCESS) + public void test() throws Throwable { + doTest(1, Result.SUCCESS); + } + } + + public static class NegativeRetrySuccessTest extends RetryTrackingTestBase { + @Test + @Retry(times = -1) + @AssertRetries(expectedTries = 1, expectedMaxRetries = 0, expectedResult = Result.SUCCESS) + public void test() throws Throwable { + doTest(1, Result.SUCCESS); + } + } + + public static class PositiveRetrySuccessTest extends RetryTrackingTestBase { + @Test + @Retry(times = 2) + @AssertRetries(expectedTries = 3, expectedMaxRetries = 2, expectedResult = Result.SUCCESS) + public void test() throws Throwable { + doTest(3, Result.SUCCESS); + } + } + + public static class PositiveRetryFailFastTest extends RetryTrackingTestBase { + @Test + @Retry(times = 2) + @AssertRetries(expectedTries = 1, expectedMaxRetries = 2, expectedResult = Result.FAILURE) + public void test() throws Throwable { + doTest(0, Result.FAILURE); + } + } + + public static class PositiveRetryFailLastTest extends RetryTrackingTestBase { + @Test + @Retry(times = 2) + @AssertRetries(expectedTries = 2, expectedMaxRetries = 2, expectedResult = Result.FAILURE) + public void test() throws Throwable { + doTest(1, Result.FAILURE); + } + } + + public static class PositiveRetryErrorFastTest extends RetryTrackingTestBase { + @Test + @Retry(times = 2) + @AssertRetries(expectedTries = 1, expectedMaxRetries = 2, expectedResult = Result.ERROR) + public void test() throws Throwable { + doTest(0, Result.ERROR); + } + } + + public static class PositiveRetryErrorLastTest extends RetryTrackingTestBase { + @Test + @Retry(times = 2) + @AssertRetries(expectedTries = 2, expectedMaxRetries = 2, expectedResult = Result.ERROR) + public void test() throws Throwable { + doTest(1, Result.ERROR); + } + } +} +// SUPPRESS CHECKSTYLE:ON IllegalThrows http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java new file mode 100644 index 0000000..139d90e --- /dev/null +++ b/commons/src/test/java/com/twitter/common/thrift/ThriftConnectionFactoryTest.java @@ -0,0 +1,143 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.hamcrest.Matcher; +import org.junit.Test; + +import com.twitter.common.net.pool.Connection; +import com.twitter.common.net.pool.ObjectPool; +import com.twitter.common.thrift.testing.MockTSocket; + +/** + * @author John Sirois + */ +public class ThriftConnectionFactoryTest { + + @Test + public void testPreconditions() { + try { + new ThriftConnectionFactory(null, 1, 1); + fail("a non-null host should be required"); + } catch (NullPointerException e) { + // expected + } + + try { + new ThriftConnectionFactory(" ", 1, 1); + fail("a non-blank host should be required"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + new ThriftConnectionFactory("localhost", 0, 1); + fail("a valid concrete remote port should be required"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + new ThriftConnectionFactory("localhost", 65536, 1); + fail("a valid port should be required"); + } catch (IllegalArgumentException e) { + // expected + } + + try { + new ThriftConnectionFactory("localhost", 65535, 0); + fail("a non-zero value for maxConnections should be required"); + } catch (IllegalArgumentException e) { + // expected + } + } + + @Test + public void testMaxConnections() throws TTransportException, IOException { + ThriftConnectionFactory thriftConnectionFactory = createConnectionFactory(2); + + Connection<TTransport, InetSocketAddress> connection1 = + thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT); + assertOpenConnection(connection1); + + Connection<TTransport, InetSocketAddress> connection2 = + thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT); + assertOpenConnection(connection2); + assertThat(connection1, not(sameInstance(connection2))); + + assertNull("Should've reached maximum connections", + thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT)); + + thriftConnectionFactory.destroy(connection1); + assertClosedConnection(connection1); + + Connection<TTransport, InetSocketAddress> connection3 = + thriftConnectionFactory.create(ObjectPool.NO_TIMEOUT); + assertOpenConnection(connection3); + @SuppressWarnings("unchecked") // Needed because type information lost in vargs. + Matcher<Connection<TTransport, InetSocketAddress>> matcher = + allOf(not(sameInstance(connection1)), not(sameInstance(connection2))); + assertThat(connection3, matcher); + } + + @Test(expected = IllegalArgumentException.class) + public void testInactiveConnectionReturn() { + createConnectionFactory(1).destroy(new TTransportConnection(new MockTSocket(), + InetSocketAddress.createUnresolved(MockTSocket.HOST, MockTSocket.PORT))); + } + + @Test(expected = IllegalArgumentException.class) + public void testNullConnectionReturn() { + createConnectionFactory(1).destroy(null); + } + + private void assertOpenConnection(Connection<TTransport, InetSocketAddress> connection) { + assertNotNull(connection); + assertTrue(connection.isValid()); + assertTrue(connection.get().isOpen()); + } + + private void assertClosedConnection(Connection<TTransport, InetSocketAddress> connection) { + assertFalse(connection.isValid()); + assertFalse(connection.get().isOpen()); + } + + private ThriftConnectionFactory createConnectionFactory(int maxConnections) { + return new ThriftConnectionFactory("foo", 1234, maxConnections) { + @Override TTransport createTransport(int timeoutMillis) throws TTransportException { + TTransport transport = new MockTSocket(); + transport.open(); + return transport; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java new file mode 100644 index 0000000..d1fcf3d --- /dev/null +++ b/commons/src/test/java/com/twitter/common/thrift/ThriftFactoryTest.java @@ -0,0 +1,245 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.collect.ImmutableSet; +import com.google.common.testing.TearDown; +import com.google.common.testing.junit4.TearDownTestCase; + +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.async.TAsyncClient; +import org.apache.thrift.async.TAsyncClientManager; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TNonblockingTransport; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.net.pool.DynamicHostSet; +import com.twitter.common.thrift.ThriftFactoryTest.GoodService.AsyncIface; +import com.twitter.thrift.ServiceInstance; + +/** + * @author John Sirois + */ +public class ThriftFactoryTest extends TearDownTestCase { + + private static final Logger LOG = Logger.getLogger(ThriftFactoryTest.class.getName()); + private IMocksControl control; + + static class GoodService { + public interface Iface { + String doWork() throws TResourceExhaustedException; + } + + public interface AsyncIface { + void doWork(AsyncMethodCallback<String> callback); + } + + public static final String DONE = "done"; + + public static class Client implements Iface { + public Client(TProtocol protocol) { + assertNotNull(protocol); + } + + @Override public String doWork() throws TResourceExhaustedException { + return DONE; + } + } + + public static class AsyncClient extends TAsyncClient implements AsyncIface { + public AsyncClient(TProtocolFactory factory, TAsyncClientManager manager, + TNonblockingTransport transport) { + super(factory, manager, transport); + assertNotNull(factory); + assertNotNull(manager); + assertNotNull(transport); + } + + @Override public void doWork(AsyncMethodCallback<String> callback) { + callback.onComplete(DONE); + } + } + } + + static class BadService { + public interface Iface { + void doWork(); + } + public interface AsyncIface { + void doWork(AsyncMethodCallback<Void> callback); + } + + public static class Client implements Iface { + @Override public void doWork() { + throw new UnsupportedOperationException(); + } + } + } + + private ImmutableSet<InetSocketAddress> endpoints; + + @Before + public void setUp() throws Exception { + control = EasyMock.createControl(); + endpoints = ImmutableSet.of(new InetSocketAddress(5555)); + } + + @Test(expected = NullPointerException.class) + public void testNullServiceInterface() { + ThriftFactory.create(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadServiceInterface() { + ThriftFactory.create(GoodService.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadServiceImpl() throws ThriftFactory.ThriftFactoryException { + ThriftFactory.<BadService.Iface>create(BadService.Iface.class) + .build(endpoints); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadAsyncServiceImpl() throws ThriftFactory.ThriftFactoryException { + ThriftFactory.<BadService.AsyncIface>create(BadService.AsyncIface.class) + .useFramedTransport(true) + .buildAsync(endpoints); + } + + @Test(expected = IllegalArgumentException.class) + public void testNoBackends() { + ThriftFactory.create(GoodService.Iface.class) + .build(ImmutableSet.<InetSocketAddress>of()); + } + + @Test + public void testCreate() throws Exception { + final AtomicReference<Socket> clientConnection = new AtomicReference<Socket>(); + final CountDownLatch connected = new CountDownLatch(1); + final ServerSocket server = new ServerSocket(0); + Thread service = new Thread(new Runnable() { + @Override public void run() { + try { + clientConnection.set(server.accept()); + } catch (IOException e) { + LOG.log(Level.WARNING, "Problem accepting a connection to thrift server", e); + } finally { + connected.countDown(); + } + } + }); + service.setDaemon(true); + service.start(); + + try { + final Thrift<GoodService.Iface> thrift = ThriftFactory.create(GoodService.Iface.class) + .withMaxConnectionsPerEndpoint(1) + .build(ImmutableSet.of(new InetSocketAddress(server.getLocalPort()))); + addTearDown(new TearDown() { + @Override public void tearDown() { + thrift.close(); + } + }); + + GoodService.Iface client = thrift.create(); + + assertEquals(GoodService.DONE, client.doWork()); + } finally { + connected.await(); + server.close(); + } + + Socket socket = clientConnection.get(); + assertNotNull(socket); + socket.close(); + } + + @Test(expected = TResourceExhaustedException.class) + public void testCreateEmpty() throws Exception { + @SuppressWarnings("unchecked") + DynamicHostSet<ServiceInstance> emptyHostSet = control.createMock(DynamicHostSet.class); + final Thrift<GoodService.Iface> thrift = ThriftFactory.create(GoodService.Iface.class) + .withMaxConnectionsPerEndpoint(1) + .build(emptyHostSet); + addTearDown(new TearDown() { + @Override public void tearDown() { + thrift.close(); + } + }); + GoodService.Iface client = thrift.create(); + + // This should throw a TResourceExhaustedException + client.doWork(); + } + + @Test + public void testCreateAsync() + throws IOException, InterruptedException, ThriftFactory.ThriftFactoryException { + final String responseHolder[] = new String[] {null}; + final CountDownLatch done = new CountDownLatch(1); + AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() { + @Override + public void onComplete(String response) { + responseHolder[0] = response; + done.countDown(); + } + + @Override + public void onError(Exception throwable) { + responseHolder[0] = throwable.toString(); + done.countDown(); + } + }; + + final Thrift<AsyncIface> thrift = ThriftFactory.create(GoodService.AsyncIface.class) + .withMaxConnectionsPerEndpoint(1) + .useFramedTransport(true) + .buildAsync(ImmutableSet.of(new InetSocketAddress(1234))); + addTearDown(new TearDown() { + @Override public void tearDown() { + thrift.close(); + } + }); + GoodService.AsyncIface client = thrift.builder() + .blocking() + .create(); + + client.doWork(callback); + assertTrue("wasn't called back in time, callback got " + responseHolder[0], + done.await(5000, TimeUnit.MILLISECONDS)); + assertEquals(GoodService.DONE, responseHolder[0]); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java b/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java new file mode 100644 index 0000000..eea6b5b --- /dev/null +++ b/commons/src/test/java/com/twitter/common/thrift/ThriftTest.java @@ -0,0 +1,934 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Function; + +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.easymock.IMocksControl; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import com.twitter.common.base.Command; +import com.twitter.common.net.loadbalancing.LoadBalancer; +import com.twitter.common.net.loadbalancing.RequestTracker; +import com.twitter.common.net.pool.Connection; +import com.twitter.common.net.pool.ObjectPool; +import com.twitter.common.net.pool.ResourceExhaustedException; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.Stat; +import com.twitter.common.stats.Stats; +import com.twitter.common.thrift.callers.RetryingCaller; +import com.twitter.common.thrift.testing.MockTSocket; +import com.twitter.common.util.concurrent.ForwardingExecutorService; + +import static org.easymock.EasyMock.and; +import static org.easymock.EasyMock.anyLong; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.isA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author John Sirois + */ +public class ThriftTest { + + private static final Amount<Long, Time> ASYNC_CONNECT_TIMEOUT = Amount.of(1L, Time.SECONDS); + + public static class NotFoundException extends Exception {} + + public interface TestService { + int calculateMass(String profileName) throws NotFoundException, TException; + } + + public interface TestServiceAsync { + void calculateMass(String profileName, AsyncMethodCallback callback) throws TException; + } + + private IMocksControl control; + private ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool; + private Function<TTransport, TestService> clientFactory; + private Function<TTransport, TestServiceAsync> asyncClientFactory; + private RequestTracker<InetSocketAddress> requestTracker; + + private AsyncMethodCallback<Integer> callback; + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + control = EasyMock.createControl(); + + this.connectionPool = control.createMock(ObjectPool.class); + this.clientFactory = control.createMock(Function.class); + this.asyncClientFactory = control.createMock(Function.class); + this.requestTracker = control.createMock(LoadBalancer.class); + + this.callback = control.createMock(AsyncMethodCallback.class); + } + + @After + public void after() { + Stats.flush(); + } + + @Test + public void testDoCallNoDeadline() throws Exception { + TestService testService = expectServiceCall(false); + expect(testService.calculateMass("jake")).andReturn(42); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); + + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + int userMass = thrift.builder().blocking().create().calculateMass("jake"); + + assertEquals(42, userMass); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 0); + assertReconnectsTotal(thrift, 0); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testDoCallAsync() throws Exception { + // Capture the callback that Thift has wrapped around our callback. + Capture<AsyncMethodCallback<Integer>> callbackCapture = + new Capture<AsyncMethodCallback<Integer>>(); + expectAsyncServiceCall(false).calculateMass(eq("jake"), capture(callbackCapture)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); + + // Verifies that our callback was called. + callback.onComplete(42); + + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + control.replay(); + + thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() + .calculateMass("jake", callback); + + // Mimicks the async response from the server. + callbackCapture.getValue().onComplete(42); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 0); + assertReconnectsTotal(thrift, 0); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testDoCallServiceException() throws Exception { + TestService testService = expectServiceCall(true); + NotFoundException notFoundException = new NotFoundException(); + expect(testService.calculateMass("jake")).andThrow(notFoundException); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + try { + thrift.builder().blocking().create().calculateMass("jake"); + fail("Expected service custom exception to bubble unmodified"); + } catch (NotFoundException e) { + assertSame(notFoundException, e); + } + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 1); + assertReconnectsTotal(thrift, 1); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testDoCallAsyncServiceException() throws Exception { + NotFoundException notFoundException = new NotFoundException(); + + // Capture the callback that Thift has wrapped around our callback. + Capture<AsyncMethodCallback<Integer>> callbackCapture = + new Capture<AsyncMethodCallback<Integer>>(); + expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // Verifies that our callback was called. + callback.onError(notFoundException); + + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + control.replay(); + + thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() + .calculateMass("jake", callback); + + // Mimicks the async response from the server. + callbackCapture.getValue().onError(notFoundException); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 1); + assertReconnectsTotal(thrift, 1); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testDoCallThriftException() throws Exception { + Capture<TTransport> transportCapture = new Capture<TTransport>(); + TestService testService = expectThriftError(transportCapture); + TTransportException tException = new TTransportException(); + expect(testService.calculateMass("jake")).andThrow(tException); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + try { + thrift.builder().blocking().create().calculateMass("jake"); + fail("Expected thrift exception to bubble unmodified"); + } catch (TException e) { + assertSame(tException, e); + } + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 1); + assertReconnectsTotal(thrift, 1); + assertTimeoutsTotal(thrift, 0); + + assertTrue(transportCapture.hasCaptured()); + assertFalse("Expected the transport to be forcibly closed when a thrift error is encountered", + transportCapture.getValue().isOpen()); + + control.verify(); + } + + @Test + public void doCallAsyncThriftException() throws Exception { + TTransportException tException = new TTransportException(); + + expectAsyncServiceCall(true).calculateMass(eq("jake"), (AsyncMethodCallback) anyObject()); + expectLastCall().andThrow(tException); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + callback.onError(tException); + + control.replay(); + + thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() + .calculateMass("jake", callback); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 1); + assertReconnectsTotal(thrift, 1); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test(expected = IllegalArgumentException.class) + public void testDisallowsAsyncWithDeadline() { + Config config = Config.builder() + .withRequestTimeout(Amount.of(1L, Time.SECONDS)) + .create(); + + new Thrift<TestServiceAsync>(config, connectionPool, requestTracker, + "foo", TestServiceAsync.class, asyncClientFactory, true, false).create(); + } + + @Test + public void testDoCallDeadlineMet() throws Exception { + TestService testService = expectServiceCall(false); + expect(testService.calculateMass("jake")).andReturn(42); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + Thrift<TestService> thrift = createThrift(executorService); + + control.replay(); + + int userMass = thrift.builder().withRequestTimeout(Amount.of(1L, Time.DAYS)).create() + .calculateMass("jake"); + + assertEquals(42, userMass); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 0); + assertReconnectsTotal(thrift, 0); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + @Ignore("Flaky: https://trac.twitter.com/twttr/ticket/11474") + public void testDoCallDeadlineExpired() throws Exception { + TestService testService = expectServiceCall(true); + + // Setup a way to verify the callable was cancelled by Thrift when timeout elapsed + final CountDownLatch remoteCallComplete = new CountDownLatch(1); + final CountDownLatch remoteCallStarted = new CountDownLatch(1); + final Command verifyCancelled = control.createMock(Command.class); + verifyCancelled.execute(); + final Object block = new Object(); + expect(testService.calculateMass("jake")).andAnswer(new IAnswer<Integer>() { + @Override public Integer answer() throws TException { + try { + synchronized (block) { + remoteCallStarted.countDown(); + block.wait(); + } + fail("Expected late work to be cancelled and interrupted"); + } catch (InterruptedException e) { + verifyCancelled.execute(); + } finally { + remoteCallComplete.countDown(); + } + throw new TTransportException(); + } + }); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.TIMEOUT), anyLong()); + + ExecutorService executorService = + new ForwardingExecutorService<ExecutorService>(Executors.newSingleThreadExecutor()) { + @Override public <T> Future<T> submit(Callable<T> task) { + Future<T> future = super.submit(task); + + // make sure the task is started so we can verify it gets cancelled + try { + remoteCallStarted.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return future; + } + }; + Thrift<TestService> thrift = createThrift(executorService); + + control.replay(); + + try { + thrift.builder().withRequestTimeout(Amount.of(1L, Time.NANOSECONDS)).create() + .calculateMass("jake"); + fail("Expected a timeout"); + } catch (TTimeoutException e) { + // expected + } finally { + remoteCallComplete.await(); + } + + assertRequestsTotal(thrift, 0); + assertErrorsTotal(thrift, 0); + assertReconnectsTotal(thrift, 0); + assertTimeoutsTotal(thrift, 1); + + control.verify(); + } + + @Test + public void testRetriesNoProblems() throws Exception { + expect(expectServiceCall(false).calculateMass("jake")).andReturn(42); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); + + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + TestService testService = thrift.builder().blocking().withRetries(1).create(); + + assertEquals(42, testService.calculateMass("jake")); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 0); + assertReconnectsTotal(thrift, 0); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testAsyncRetriesNoProblems() throws Exception { + // Capture the callback that Thift has wrapped around our callback. + Capture<AsyncMethodCallback<Integer>> callbackCapture = + new Capture<AsyncMethodCallback<Integer>>(); + expectAsyncServiceCall(false).calculateMass(eq("jake"), capture(callbackCapture)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); + + // Verifies that our callback was called. + callback.onComplete(42); + + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + control.replay(); + + thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() + .calculateMass("jake", callback); + + // Mimicks the async response from the server. + callbackCapture.getValue().onComplete(42); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 0); + assertReconnectsTotal(thrift, 0); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testRetriesRecover() throws Exception { + // 1st call + expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException()); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // 1st retry recovers + expect(expectServiceCall(false).calculateMass("jake")).andReturn(42); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); + + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + TestService testService = thrift.builder().blocking().withRetries(1).create(); + + assertEquals(42, testService.calculateMass("jake")); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 0); + assertReconnectsTotal(thrift, 0); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testAsyncRetriesRecover() throws Exception { + // Capture the callback that Thift has wrapped around our callback. + Capture<AsyncMethodCallback<Integer>> callbackCapture = + new Capture<AsyncMethodCallback<Integer>>(); + + // 1st call + expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture)); + expectLastCall().andThrow(new TTransportException()); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // 1st retry recovers + expectAsyncServiceRetry(false).calculateMass(eq("jake"), capture(callbackCapture)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); + + // Verifies that our callback was called. + callback.onComplete(42); + + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + control.replay(); + + thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() + .calculateMass("jake", callback); + + // Mimicks the async response from the server. + callbackCapture.getValue().onComplete(42); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 0); + assertReconnectsTotal(thrift, 0); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testRetriesFailure() throws Exception { + // 1st call + expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException()); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // 1st retry + expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException()); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // 2nd retry + TTransportException finalRetryException = new TTransportException(); + expect(expectServiceCall(true).calculateMass("jake")).andThrow(finalRetryException); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + TestService testService = thrift.builder().blocking().withRetries(2).create(); + + try { + testService.calculateMass("jake"); + fail("Expected an exception to be thrown since all retires failed"); + } catch (TException e) { + assertSame(finalRetryException, e); + } + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 1); + assertReconnectsTotal(thrift, 1); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testAsyncRetriesFailure() throws Exception { + // 1st call + Capture<AsyncMethodCallback<Integer>> callbackCapture1 = + new Capture<AsyncMethodCallback<Integer>>(); + expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture1)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // 1st retry + Capture<AsyncMethodCallback<Integer>> callbackCapture2 = + new Capture<AsyncMethodCallback<Integer>>(); + expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture2)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // 2nd retry + Capture<AsyncMethodCallback<Integer>> callbackCapture3 = + new Capture<AsyncMethodCallback<Integer>>(); + expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture3)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // Verifies that our callback was called. + TTransportException returnedException = new TTransportException(); + callback.onError(returnedException); + + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + control.replay(); + + thrift.builder().withRetries(2).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() + .calculateMass("jake", callback); + + callbackCapture1.getValue().onError(new TTransportException()); + callbackCapture2.getValue().onError(new IOException()); + callbackCapture3.getValue().onError(returnedException); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 1); + assertReconnectsTotal(thrift, 1); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testRetrySelection() throws Exception { + expect(expectServiceCall(true).calculateMass("jake")).andThrow(new NotFoundException()); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // verify subclasses pass the retry filter + class HopelesslyLost extends NotFoundException {} + expect(expectServiceCall(true).calculateMass("jake")).andThrow(new HopelesslyLost()); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + TTransportException nonRetryableException = new TTransportException(); + expect(expectServiceCall(true).calculateMass("jake")).andThrow(nonRetryableException); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + TestService testService = + thrift.builder().blocking().withRetries(2).retryOn(NotFoundException.class).create(); + + try { + testService.calculateMass("jake"); + fail("Expected n exception to be thrown since all retires failed"); + } catch (TException e) { + assertSame(nonRetryableException, e); + } + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 1); + assertReconnectsTotal(thrift, 1); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testAsyncRetrySelection() throws Exception { + // verify subclasses pass the retry filter + class HopelesslyLost extends NotFoundException {} + Capture<AsyncMethodCallback<Integer>> callbackCapture1 = + new Capture<AsyncMethodCallback<Integer>>(); + expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture1)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + Capture<AsyncMethodCallback<Integer>> callbackCapture2 = + new Capture<AsyncMethodCallback<Integer>>(); + expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture2)); + requestTracker.requestResult( + (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); + + // Verifies that our callback was called. + TTransportException nonRetryableException = new TTransportException(); + callback.onError(nonRetryableException); + + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + control.replay(); + + TestServiceAsync testService = thrift.builder() + .withRetries(2) + .retryOn(NotFoundException.class) + .withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create(); + + testService.calculateMass("jake", callback); + callbackCapture1.getValue().onError(new HopelesslyLost()); + callbackCapture2.getValue().onError(nonRetryableException); + + assertRequestsTotal(thrift, 1); + assertErrorsTotal(thrift, 1); + assertReconnectsTotal(thrift, 1); + assertTimeoutsTotal(thrift, 0); + + control.verify(); + } + + @Test + public void testResourceExhausted() throws Exception { + expectConnectionPoolResourceExhausted(Config.DEFAULT_CONNECT_TIMEOUT); + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + TestService testService = thrift.builder().blocking().create(); + + try { + testService.calculateMass("jake"); + fail("Expected a TResourceExhaustedException."); + } catch (TResourceExhaustedException e) { + // Expected + } + + control.verify(); + } + + @Test + public void testAsyncResourceExhausted() throws Exception { + expectConnectionPoolResourceExhausted(ASYNC_CONNECT_TIMEOUT); + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + callback.onError(and(anyObject(), isA(TResourceExhaustedException.class))); + + control.replay(); + + TestServiceAsync testService = thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT) + .create(); + + testService.calculateMass("jake", callback); + + control.verify(); + } + + @Test + public void testAsyncDoesNotRetryResourceExhausted() throws Exception { + expect(connectionPool.get(ASYNC_CONNECT_TIMEOUT)).andThrow( + new ResourceExhaustedException("first")); + + Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); + + callback.onError(and(anyObject(), isA(TResourceExhaustedException.class))); + + control.replay(); + + thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() + .calculateMass("jake", callback); + + control.verify(); + } + + @Test + public void testConnectionPoolTimeout() throws Exception { + expectConnectionPoolTimeout(Config.DEFAULT_CONNECT_TIMEOUT); + Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); + + control.replay(); + + TestService testService = + thrift.builder().blocking().create(); + + try { + testService.calculateMass("jake"); + fail("Expected a TTimeoutException."); + } catch (TTimeoutException e) { + // Expected + } + + control.verify(); + } + + @Test + public void testDoCallDeadlineNoThreads() throws Exception { + control.replay(); + + ExecutorService executorService = + new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>()); + + Thrift<TestService> thrift = createThrift(executorService); + + final TestService service = + thrift.builder().noRetries().withRequestTimeout(Amount.of(1L, Time.SECONDS)).create(); + + final CountDownLatch remoteCallComplete = new CountDownLatch(1); + final CountDownLatch remoteCallStarted = new CountDownLatch(1); + + Future<Integer> result = executorService.submit(new Callable<Integer>() { + @Override public Integer call() throws Exception { + remoteCallStarted.countDown(); + remoteCallComplete.await(); + return service.calculateMass("jake"); + } + }); + + remoteCallStarted.await(); + try { + service.calculateMass("jake"); + fail("Expected no available threads to trigger resource exhausted"); + } catch (TResourceExhaustedException e) { + // expected + } finally { + remoteCallComplete.countDown(); + } + + try { + result.get(); + fail("Expected no available threads to trigger resource exhausted"); + } catch (ExecutionException e) { + assertEquals(TResourceExhaustedException.class, e.getCause().getClass()); + } + + control.verify(); + } + + private ExecutorService expectUnusedExecutorService() { + return control.createMock(ExecutorService.class); + } + + private static final String STAT_REQUESTS = "requests_events"; + private static final String STAT_ERRORS = "errors"; + private static final String STAT_RECONNECTS = "reconnects"; + private static final String STAT_TIMEOUTS = "timeouts"; + + private void assertRequestsTotal(Thrift<?> thrift, int total) { + assertRequestStatValue(STAT_REQUESTS, total); + } + + private void assertErrorsTotal(Thrift<?> thrift, int total) { + assertRequestStatValue(STAT_ERRORS, total); + } + + private void assertReconnectsTotal(Thrift<?> thrift, int total) { + assertRequestStatValue(STAT_RECONNECTS, total); + } + + private void assertTimeoutsTotal(Thrift<?> thrift, int total) { + assertRequestStatValue(STAT_TIMEOUTS, total); + } + + private void assertRequestStatValue(String statName, long expectedValue) { + + Stat<Long> var = Stats.getVariable("foo_calculateMass_" + statName); + + assertNotNull(var); + assertEquals(expectedValue, (long) var.read()); + } + + private Thrift<TestService> createThrift(ExecutorService executorService) { + return new Thrift<TestService>(executorService, connectionPool, requestTracker, "foo", + TestService.class, clientFactory, false, false); + } + + private Thrift<TestServiceAsync> createAsyncThrift(ExecutorService executorService) { + return new Thrift<TestServiceAsync>(executorService, connectionPool, requestTracker, "foo", + TestServiceAsync.class, asyncClientFactory, true, false); + } + + private TestService expectServiceCall(boolean withFailure) + throws ResourceExhaustedException, TimeoutException { + Connection<TTransport, InetSocketAddress> connection = expectConnectionPoolGet(); + return expectServiceCall(connection, withFailure); + } + + private TestServiceAsync expectAsyncServiceCall(boolean withFailure) + throws ResourceExhaustedException, TimeoutException { + return expectAsyncServiceCall(expectConnectionPoolGet(ASYNC_CONNECT_TIMEOUT), withFailure); + } + + private TestServiceAsync expectAsyncServiceRetry(boolean withFailure) + throws ResourceExhaustedException, TimeoutException { + return expectAsyncServiceCall( + expectConnectionPoolGet(RetryingCaller.NONBLOCKING_TIMEOUT), withFailure); + } + + private TestService expectThriftError(Capture<TTransport> transportCapture) + throws ResourceExhaustedException, TimeoutException { + Connection<TTransport, InetSocketAddress> connection = expectConnectionPoolGet(); + return expectServiceCall(connection, transportCapture, true); + } + + private Connection<TTransport, InetSocketAddress> expectConnectionPoolGet() + throws ResourceExhaustedException, TimeoutException { + Connection<TTransport, InetSocketAddress> connection = createConnection(); + expect(connectionPool.get(Config.DEFAULT_CONNECT_TIMEOUT)).andReturn(connection); + return connection; + } + + private Connection<TTransport, InetSocketAddress> expectConnectionPoolGet( + Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException { + Connection<TTransport, InetSocketAddress> connection = createConnection(); + expect(connectionPool.get(timeout)).andReturn(connection); + return connection; + } + + private void expectConnectionPoolResourceExhausted(Amount<Long, Time> timeout) + throws ResourceExhaustedException, TimeoutException { + expect(connectionPool.get(timeout)).andThrow(new ResourceExhaustedException("")); + } + + private void expectConnectionPoolTimeout(Amount<Long, Time> timeout) + throws ResourceExhaustedException, TimeoutException { + expect(connectionPool.get(timeout)).andThrow(new TimeoutException()); + } + + private Connection<TTransport, InetSocketAddress> createConnection() { + return new TTransportConnection(new MockTSocket(), + InetSocketAddress.createUnresolved(MockTSocket.HOST, MockTSocket.PORT)); + } + + private TestService expectServiceCall(Connection<TTransport, InetSocketAddress> connection, + boolean withFailure) { + return expectServiceCall(connection, null, withFailure); + } + + private TestServiceAsync expectAsyncServiceCall( + Connection<TTransport, InetSocketAddress> connection, boolean withFailure) { + return expectAsyncServiceCall(connection, null, withFailure); + } + + private TestService expectServiceCall(Connection<TTransport, InetSocketAddress> connection, + Capture<TTransport> transportCapture, boolean withFailure) { + + TestService testService = control.createMock(TestService.class); + if (connection != null) { + IExpectationSetters<TestService> expectApply = transportCapture == null + ? expect(clientFactory.apply(EasyMock.isA(TTransport.class))) + : expect(clientFactory.apply(EasyMock.capture(transportCapture))); + expectApply.andReturn(testService); + + if (withFailure) { + connectionPool.remove(connection); + } else { + connectionPool.release(connection); + } + } + return testService; + } + + private TestServiceAsync expectAsyncServiceCall( + Connection<TTransport, InetSocketAddress> connection, + Capture<TTransport> transportCapture, boolean withFailure) { + + TestServiceAsync testService = control.createMock(TestServiceAsync.class); + if (connection != null) { + IExpectationSetters<TestServiceAsync> expectApply = transportCapture == null + ? expect(asyncClientFactory.apply(EasyMock.isA(TTransport.class))) + : expect(asyncClientFactory.apply(EasyMock.capture(transportCapture))); + expectApply.andReturn(testService); + + if (withFailure) { + connectionPool.remove(connection); + } else { + connectionPool.release(connection); + } + } + return testService; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java b/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java new file mode 100644 index 0000000..cf55afe --- /dev/null +++ b/commons/src/test/java/com/twitter/common/thrift/callers/AbstractCallerTest.java @@ -0,0 +1,59 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift.callers; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import org.junit.Before; + +import java.lang.reflect.Method; + +import static org.easymock.EasyMock.expect; + +/** + * Test framework for testing callers. + * + * @author William Farner + */ +public abstract class AbstractCallerTest extends EasyMockTest { + protected final Amount<Long, Time> CONNECT_TIMEOUT = Amount.of(1L, Time.HOURS); + + protected Caller caller; + + protected Method methodA; + protected Object[] argsA; + + @Before + public final void callerSetUp() throws Exception { + caller = createMock(Caller.class); + methodA = Object.class.getMethod("toString"); + argsA = new Object[] {}; + } + + protected String call(Caller caller) throws Throwable { + return (String) caller.call(methodA, argsA, null, CONNECT_TIMEOUT); + } + + protected void expectCall(String returnValue) throws Throwable { + expect(caller.call(methodA, argsA, null, CONNECT_TIMEOUT)).andReturn(returnValue); + } + + protected void expectCall(Throwable thrown) throws Throwable { + expect(caller.call(methodA, argsA, null, CONNECT_TIMEOUT)).andThrow(thrown); + } +}
