http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/thrift/ThriftTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/thrift/ThriftTest.java b/commons/src/test/java/org/apache/aurora/common/thrift/ThriftTest.java deleted file mode 100644 index 4b96308..0000000 --- a/commons/src/test/java/org/apache/aurora/common/thrift/ThriftTest.java +++ /dev/null @@ -1,931 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import com.google.common.base.Function; - -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.IExpectationSetters; -import org.easymock.IMocksControl; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.loadbalancing.LoadBalancer; -import org.apache.aurora.common.net.loadbalancing.RequestTracker; -import org.apache.aurora.common.net.pool.Connection; -import org.apache.aurora.common.net.pool.ObjectPool; -import org.apache.aurora.common.net.pool.ResourceExhaustedException; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.Stat; -import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.common.thrift.callers.RetryingCaller; -import org.apache.aurora.common.thrift.testing.MockTSocket; -import org.apache.aurora.common.util.concurrent.ForwardingExecutorService; - -import static org.easymock.EasyMock.and; -import static org.easymock.EasyMock.anyLong; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.isA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * @author John Sirois - */ -public class ThriftTest { - - private static final Amount<Long, Time> ASYNC_CONNECT_TIMEOUT = Amount.of(1L, Time.SECONDS); - - public static class NotFoundException extends Exception {} - - public interface TestService { - int calculateMass(String profileName) throws NotFoundException, TException; - } - - public interface TestServiceAsync { - void calculateMass(String profileName, AsyncMethodCallback callback) throws TException; - } - - private IMocksControl control; - private ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool; - private Function<TTransport, TestService> clientFactory; - private Function<TTransport, TestServiceAsync> asyncClientFactory; - private RequestTracker<InetSocketAddress> requestTracker; - - private AsyncMethodCallback<Integer> callback; - - @SuppressWarnings("unchecked") - @Before - public void setUp() throws Exception { - control = EasyMock.createControl(); - - this.connectionPool = control.createMock(ObjectPool.class); - this.clientFactory = control.createMock(Function.class); - this.asyncClientFactory = control.createMock(Function.class); - this.requestTracker = control.createMock(LoadBalancer.class); - - this.callback = control.createMock(AsyncMethodCallback.class); - } - - @After - public void after() { - Stats.flush(); - } - - @Test - public void testDoCallNoDeadline() throws Exception { - TestService testService = expectServiceCall(false); - expect(testService.calculateMass("jake")).andReturn(42); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); - - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - int userMass = thrift.builder().blocking().create().calculateMass("jake"); - - assertEquals(42, userMass); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 0); - assertReconnectsTotal(thrift, 0); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testDoCallAsync() throws Exception { - // Capture the callback that Thift has wrapped around our callback. - Capture<AsyncMethodCallback<Integer>> callbackCapture = - new Capture<AsyncMethodCallback<Integer>>(); - expectAsyncServiceCall(false).calculateMass(eq("jake"), capture(callbackCapture)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); - - // Verifies that our callback was called. - callback.onComplete(42); - - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - control.replay(); - - thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() - .calculateMass("jake", callback); - - // Mimicks the async response from the server. - callbackCapture.getValue().onComplete(42); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 0); - assertReconnectsTotal(thrift, 0); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testDoCallServiceException() throws Exception { - TestService testService = expectServiceCall(true); - NotFoundException notFoundException = new NotFoundException(); - expect(testService.calculateMass("jake")).andThrow(notFoundException); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - try { - thrift.builder().blocking().create().calculateMass("jake"); - fail("Expected service custom exception to bubble unmodified"); - } catch (NotFoundException e) { - assertSame(notFoundException, e); - } - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 1); - assertReconnectsTotal(thrift, 1); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testDoCallAsyncServiceException() throws Exception { - NotFoundException notFoundException = new NotFoundException(); - - // Capture the callback that Thift has wrapped around our callback. - Capture<AsyncMethodCallback<Integer>> callbackCapture = - new Capture<AsyncMethodCallback<Integer>>(); - expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // Verifies that our callback was called. - callback.onError(notFoundException); - - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - control.replay(); - - thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() - .calculateMass("jake", callback); - - // Mimicks the async response from the server. - callbackCapture.getValue().onError(notFoundException); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 1); - assertReconnectsTotal(thrift, 1); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testDoCallThriftException() throws Exception { - Capture<TTransport> transportCapture = new Capture<TTransport>(); - TestService testService = expectThriftError(transportCapture); - TTransportException tException = new TTransportException(); - expect(testService.calculateMass("jake")).andThrow(tException); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - try { - thrift.builder().blocking().create().calculateMass("jake"); - fail("Expected thrift exception to bubble unmodified"); - } catch (TException e) { - assertSame(tException, e); - } - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 1); - assertReconnectsTotal(thrift, 1); - assertTimeoutsTotal(thrift, 0); - - assertTrue(transportCapture.hasCaptured()); - assertFalse("Expected the transport to be forcibly closed when a thrift error is encountered", - transportCapture.getValue().isOpen()); - - control.verify(); - } - - @Test - public void doCallAsyncThriftException() throws Exception { - TTransportException tException = new TTransportException(); - - expectAsyncServiceCall(true).calculateMass(eq("jake"), (AsyncMethodCallback) anyObject()); - expectLastCall().andThrow(tException); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - callback.onError(tException); - - control.replay(); - - thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() - .calculateMass("jake", callback); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 1); - assertReconnectsTotal(thrift, 1); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test(expected = IllegalArgumentException.class) - public void testDisallowsAsyncWithDeadline() { - Config config = Config.builder() - .withRequestTimeout(Amount.of(1L, Time.SECONDS)) - .create(); - - new Thrift<TestServiceAsync>(config, connectionPool, requestTracker, - "foo", TestServiceAsync.class, asyncClientFactory, true, false).create(); - } - - @Test - public void testDoCallDeadlineMet() throws Exception { - TestService testService = expectServiceCall(false); - expect(testService.calculateMass("jake")).andReturn(42); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); - - ExecutorService executorService = Executors.newSingleThreadExecutor(); - Thrift<TestService> thrift = createThrift(executorService); - - control.replay(); - - int userMass = thrift.builder().withRequestTimeout(Amount.of(1L, Time.DAYS)).create() - .calculateMass("jake"); - - assertEquals(42, userMass); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 0); - assertReconnectsTotal(thrift, 0); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - @Ignore("Flaky: https://trac.twitter.com/twttr/ticket/11474") - public void testDoCallDeadlineExpired() throws Exception { - TestService testService = expectServiceCall(true); - - // Setup a way to verify the callable was cancelled by Thrift when timeout elapsed - final CountDownLatch remoteCallComplete = new CountDownLatch(1); - final CountDownLatch remoteCallStarted = new CountDownLatch(1); - final Command verifyCancelled = control.createMock(Command.class); - verifyCancelled.execute(); - final Object block = new Object(); - expect(testService.calculateMass("jake")).andAnswer(new IAnswer<Integer>() { - @Override public Integer answer() throws TException { - try { - synchronized (block) { - remoteCallStarted.countDown(); - block.wait(); - } - fail("Expected late work to be cancelled and interrupted"); - } catch (InterruptedException e) { - verifyCancelled.execute(); - } finally { - remoteCallComplete.countDown(); - } - throw new TTransportException(); - } - }); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.TIMEOUT), anyLong()); - - ExecutorService executorService = - new ForwardingExecutorService<ExecutorService>(Executors.newSingleThreadExecutor()) { - @Override public <T> Future<T> submit(Callable<T> task) { - Future<T> future = super.submit(task); - - // make sure the task is started so we can verify it gets cancelled - try { - remoteCallStarted.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - return future; - } - }; - Thrift<TestService> thrift = createThrift(executorService); - - control.replay(); - - try { - thrift.builder().withRequestTimeout(Amount.of(1L, Time.NANOSECONDS)).create() - .calculateMass("jake"); - fail("Expected a timeout"); - } catch (TTimeoutException e) { - // expected - } finally { - remoteCallComplete.await(); - } - - assertRequestsTotal(thrift, 0); - assertErrorsTotal(thrift, 0); - assertReconnectsTotal(thrift, 0); - assertTimeoutsTotal(thrift, 1); - - control.verify(); - } - - @Test - public void testRetriesNoProblems() throws Exception { - expect(expectServiceCall(false).calculateMass("jake")).andReturn(42); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); - - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - TestService testService = thrift.builder().blocking().withRetries(1).create(); - - assertEquals(42, testService.calculateMass("jake")); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 0); - assertReconnectsTotal(thrift, 0); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testAsyncRetriesNoProblems() throws Exception { - // Capture the callback that Thift has wrapped around our callback. - Capture<AsyncMethodCallback<Integer>> callbackCapture = - new Capture<AsyncMethodCallback<Integer>>(); - expectAsyncServiceCall(false).calculateMass(eq("jake"), capture(callbackCapture)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); - - // Verifies that our callback was called. - callback.onComplete(42); - - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - control.replay(); - - thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() - .calculateMass("jake", callback); - - // Mimicks the async response from the server. - callbackCapture.getValue().onComplete(42); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 0); - assertReconnectsTotal(thrift, 0); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testRetriesRecover() throws Exception { - // 1st call - expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException()); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // 1st retry recovers - expect(expectServiceCall(false).calculateMass("jake")).andReturn(42); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); - - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - TestService testService = thrift.builder().blocking().withRetries(1).create(); - - assertEquals(42, testService.calculateMass("jake")); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 0); - assertReconnectsTotal(thrift, 0); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testAsyncRetriesRecover() throws Exception { - // Capture the callback that Thift has wrapped around our callback. - Capture<AsyncMethodCallback<Integer>> callbackCapture = - new Capture<AsyncMethodCallback<Integer>>(); - - // 1st call - expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture)); - expectLastCall().andThrow(new TTransportException()); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // 1st retry recovers - expectAsyncServiceRetry(false).calculateMass(eq("jake"), capture(callbackCapture)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.SUCCESS), anyLong()); - - // Verifies that our callback was called. - callback.onComplete(42); - - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - control.replay(); - - thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() - .calculateMass("jake", callback); - - // Mimicks the async response from the server. - callbackCapture.getValue().onComplete(42); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 0); - assertReconnectsTotal(thrift, 0); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testRetriesFailure() throws Exception { - // 1st call - expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException()); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // 1st retry - expect(expectServiceCall(true).calculateMass("jake")).andThrow(new TTransportException()); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // 2nd retry - TTransportException finalRetryException = new TTransportException(); - expect(expectServiceCall(true).calculateMass("jake")).andThrow(finalRetryException); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - TestService testService = thrift.builder().blocking().withRetries(2).create(); - - try { - testService.calculateMass("jake"); - fail("Expected an exception to be thrown since all retires failed"); - } catch (TException e) { - assertSame(finalRetryException, e); - } - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 1); - assertReconnectsTotal(thrift, 1); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testAsyncRetriesFailure() throws Exception { - // 1st call - Capture<AsyncMethodCallback<Integer>> callbackCapture1 = - new Capture<AsyncMethodCallback<Integer>>(); - expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture1)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // 1st retry - Capture<AsyncMethodCallback<Integer>> callbackCapture2 = - new Capture<AsyncMethodCallback<Integer>>(); - expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture2)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // 2nd retry - Capture<AsyncMethodCallback<Integer>> callbackCapture3 = - new Capture<AsyncMethodCallback<Integer>>(); - expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture3)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // Verifies that our callback was called. - TTransportException returnedException = new TTransportException(); - callback.onError(returnedException); - - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - control.replay(); - - thrift.builder().withRetries(2).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() - .calculateMass("jake", callback); - - callbackCapture1.getValue().onError(new TTransportException()); - callbackCapture2.getValue().onError(new IOException()); - callbackCapture3.getValue().onError(returnedException); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 1); - assertReconnectsTotal(thrift, 1); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testRetrySelection() throws Exception { - expect(expectServiceCall(true).calculateMass("jake")).andThrow(new NotFoundException()); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // verify subclasses pass the retry filter - class HopelesslyLost extends NotFoundException {} - expect(expectServiceCall(true).calculateMass("jake")).andThrow(new HopelesslyLost()); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - TTransportException nonRetryableException = new TTransportException(); - expect(expectServiceCall(true).calculateMass("jake")).andThrow(nonRetryableException); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - TestService testService = - thrift.builder().blocking().withRetries(2).retryOn(NotFoundException.class).create(); - - try { - testService.calculateMass("jake"); - fail("Expected n exception to be thrown since all retires failed"); - } catch (TException e) { - assertSame(nonRetryableException, e); - } - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 1); - assertReconnectsTotal(thrift, 1); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testAsyncRetrySelection() throws Exception { - // verify subclasses pass the retry filter - class HopelesslyLost extends NotFoundException {} - Capture<AsyncMethodCallback<Integer>> callbackCapture1 = - new Capture<AsyncMethodCallback<Integer>>(); - expectAsyncServiceCall(true).calculateMass(eq("jake"), capture(callbackCapture1)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - Capture<AsyncMethodCallback<Integer>> callbackCapture2 = - new Capture<AsyncMethodCallback<Integer>>(); - expectAsyncServiceRetry(true).calculateMass(eq("jake"), capture(callbackCapture2)); - requestTracker.requestResult( - (InetSocketAddress) anyObject(), eq(RequestTracker.RequestResult.FAILED), anyLong()); - - // Verifies that our callback was called. - TTransportException nonRetryableException = new TTransportException(); - callback.onError(nonRetryableException); - - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - control.replay(); - - TestServiceAsync testService = thrift.builder() - .withRetries(2) - .retryOn(NotFoundException.class) - .withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create(); - - testService.calculateMass("jake", callback); - callbackCapture1.getValue().onError(new HopelesslyLost()); - callbackCapture2.getValue().onError(nonRetryableException); - - assertRequestsTotal(thrift, 1); - assertErrorsTotal(thrift, 1); - assertReconnectsTotal(thrift, 1); - assertTimeoutsTotal(thrift, 0); - - control.verify(); - } - - @Test - public void testResourceExhausted() throws Exception { - expectConnectionPoolResourceExhausted(Config.DEFAULT_CONNECT_TIMEOUT); - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - TestService testService = thrift.builder().blocking().create(); - - try { - testService.calculateMass("jake"); - fail("Expected a TResourceExhaustedException."); - } catch (TResourceExhaustedException e) { - // Expected - } - - control.verify(); - } - - @Test - public void testAsyncResourceExhausted() throws Exception { - expectConnectionPoolResourceExhausted(ASYNC_CONNECT_TIMEOUT); - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - callback.onError(and(anyObject(), isA(TResourceExhaustedException.class))); - - control.replay(); - - TestServiceAsync testService = thrift.builder().withConnectTimeout(ASYNC_CONNECT_TIMEOUT) - .create(); - - testService.calculateMass("jake", callback); - - control.verify(); - } - - @Test - public void testAsyncDoesNotRetryResourceExhausted() throws Exception { - expect(connectionPool.get(ASYNC_CONNECT_TIMEOUT)).andThrow( - new ResourceExhaustedException("first")); - - Thrift<TestServiceAsync> thrift = createAsyncThrift(expectUnusedExecutorService()); - - callback.onError(and(anyObject(), isA(TResourceExhaustedException.class))); - - control.replay(); - - thrift.builder().withRetries(1).withConnectTimeout(ASYNC_CONNECT_TIMEOUT).create() - .calculateMass("jake", callback); - - control.verify(); - } - - @Test - public void testConnectionPoolTimeout() throws Exception { - expectConnectionPoolTimeout(Config.DEFAULT_CONNECT_TIMEOUT); - Thrift<TestService> thrift = createThrift(expectUnusedExecutorService()); - - control.replay(); - - TestService testService = - thrift.builder().blocking().create(); - - try { - testService.calculateMass("jake"); - fail("Expected a TTimeoutException."); - } catch (TTimeoutException e) { - // Expected - } - - control.verify(); - } - - @Test - public void testDoCallDeadlineNoThreads() throws Exception { - control.replay(); - - ExecutorService executorService = - new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>()); - - Thrift<TestService> thrift = createThrift(executorService); - - final TestService service = - thrift.builder().noRetries().withRequestTimeout(Amount.of(1L, Time.SECONDS)).create(); - - final CountDownLatch remoteCallComplete = new CountDownLatch(1); - final CountDownLatch remoteCallStarted = new CountDownLatch(1); - - Future<Integer> result = executorService.submit(new Callable<Integer>() { - @Override public Integer call() throws Exception { - remoteCallStarted.countDown(); - remoteCallComplete.await(); - return service.calculateMass("jake"); - } - }); - - remoteCallStarted.await(); - try { - service.calculateMass("jake"); - fail("Expected no available threads to trigger resource exhausted"); - } catch (TResourceExhaustedException e) { - // expected - } finally { - remoteCallComplete.countDown(); - } - - try { - result.get(); - fail("Expected no available threads to trigger resource exhausted"); - } catch (ExecutionException e) { - assertEquals(TResourceExhaustedException.class, e.getCause().getClass()); - } - - control.verify(); - } - - private ExecutorService expectUnusedExecutorService() { - return control.createMock(ExecutorService.class); - } - - private static final String STAT_REQUESTS = "requests_events"; - private static final String STAT_ERRORS = "errors"; - private static final String STAT_RECONNECTS = "reconnects"; - private static final String STAT_TIMEOUTS = "timeouts"; - - private void assertRequestsTotal(Thrift<?> thrift, int total) { - assertRequestStatValue(STAT_REQUESTS, total); - } - - private void assertErrorsTotal(Thrift<?> thrift, int total) { - assertRequestStatValue(STAT_ERRORS, total); - } - - private void assertReconnectsTotal(Thrift<?> thrift, int total) { - assertRequestStatValue(STAT_RECONNECTS, total); - } - - private void assertTimeoutsTotal(Thrift<?> thrift, int total) { - assertRequestStatValue(STAT_TIMEOUTS, total); - } - - private void assertRequestStatValue(String statName, long expectedValue) { - - Stat<Long> var = Stats.getVariable("foo_calculateMass_" + statName); - - assertNotNull(var); - assertEquals(expectedValue, (long) var.read()); - } - - private Thrift<TestService> createThrift(ExecutorService executorService) { - return new Thrift<TestService>(executorService, connectionPool, requestTracker, "foo", - TestService.class, clientFactory, false, false); - } - - private Thrift<TestServiceAsync> createAsyncThrift(ExecutorService executorService) { - return new Thrift<TestServiceAsync>(executorService, connectionPool, requestTracker, "foo", - TestServiceAsync.class, asyncClientFactory, true, false); - } - - private TestService expectServiceCall(boolean withFailure) - throws ResourceExhaustedException, TimeoutException { - Connection<TTransport, InetSocketAddress> connection = expectConnectionPoolGet(); - return expectServiceCall(connection, withFailure); - } - - private TestServiceAsync expectAsyncServiceCall(boolean withFailure) - throws ResourceExhaustedException, TimeoutException { - return expectAsyncServiceCall(expectConnectionPoolGet(ASYNC_CONNECT_TIMEOUT), withFailure); - } - - private TestServiceAsync expectAsyncServiceRetry(boolean withFailure) - throws ResourceExhaustedException, TimeoutException { - return expectAsyncServiceCall( - expectConnectionPoolGet(RetryingCaller.NONBLOCKING_TIMEOUT), withFailure); - } - - private TestService expectThriftError(Capture<TTransport> transportCapture) - throws ResourceExhaustedException, TimeoutException { - Connection<TTransport, InetSocketAddress> connection = expectConnectionPoolGet(); - return expectServiceCall(connection, transportCapture, true); - } - - private Connection<TTransport, InetSocketAddress> expectConnectionPoolGet() - throws ResourceExhaustedException, TimeoutException { - Connection<TTransport, InetSocketAddress> connection = createConnection(); - expect(connectionPool.get(Config.DEFAULT_CONNECT_TIMEOUT)).andReturn(connection); - return connection; - } - - private Connection<TTransport, InetSocketAddress> expectConnectionPoolGet( - Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException { - Connection<TTransport, InetSocketAddress> connection = createConnection(); - expect(connectionPool.get(timeout)).andReturn(connection); - return connection; - } - - private void expectConnectionPoolResourceExhausted(Amount<Long, Time> timeout) - throws ResourceExhaustedException, TimeoutException { - expect(connectionPool.get(timeout)).andThrow(new ResourceExhaustedException("")); - } - - private void expectConnectionPoolTimeout(Amount<Long, Time> timeout) - throws ResourceExhaustedException, TimeoutException { - expect(connectionPool.get(timeout)).andThrow(new TimeoutException()); - } - - private Connection<TTransport, InetSocketAddress> createConnection() { - return new TTransportConnection(new MockTSocket(), - InetSocketAddress.createUnresolved(MockTSocket.HOST, MockTSocket.PORT)); - } - - private TestService expectServiceCall(Connection<TTransport, InetSocketAddress> connection, - boolean withFailure) { - return expectServiceCall(connection, null, withFailure); - } - - private TestServiceAsync expectAsyncServiceCall( - Connection<TTransport, InetSocketAddress> connection, boolean withFailure) { - return expectAsyncServiceCall(connection, null, withFailure); - } - - private TestService expectServiceCall(Connection<TTransport, InetSocketAddress> connection, - Capture<TTransport> transportCapture, boolean withFailure) { - - TestService testService = control.createMock(TestService.class); - if (connection != null) { - IExpectationSetters<TestService> expectApply = transportCapture == null - ? expect(clientFactory.apply(EasyMock.isA(TTransport.class))) - : expect(clientFactory.apply(EasyMock.capture(transportCapture))); - expectApply.andReturn(testService); - - if (withFailure) { - connectionPool.remove(connection); - } else { - connectionPool.release(connection); - } - } - return testService; - } - - private TestServiceAsync expectAsyncServiceCall( - Connection<TTransport, InetSocketAddress> connection, - Capture<TTransport> transportCapture, boolean withFailure) { - - TestServiceAsync testService = control.createMock(TestServiceAsync.class); - if (connection != null) { - IExpectationSetters<TestServiceAsync> expectApply = transportCapture == null - ? expect(asyncClientFactory.apply(EasyMock.isA(TTransport.class))) - : expect(asyncClientFactory.apply(EasyMock.capture(transportCapture))); - expectApply.andReturn(testService); - - if (withFailure) { - connectionPool.remove(connection); - } else { - connectionPool.release(connection); - } - } - return testService; - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/thrift/callers/AbstractCallerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/thrift/callers/AbstractCallerTest.java b/commons/src/test/java/org/apache/aurora/common/thrift/callers/AbstractCallerTest.java deleted file mode 100644 index e66a9da..0000000 --- a/commons/src/test/java/org/apache/aurora/common/thrift/callers/AbstractCallerTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.junit.Before; - -import java.lang.reflect.Method; - -import static org.easymock.EasyMock.expect; - -/** - * Test framework for testing callers. - * - * @author William Farner - */ -public abstract class AbstractCallerTest extends EasyMockTest { - protected final Amount<Long, Time> CONNECT_TIMEOUT = Amount.of(1L, Time.HOURS); - - protected Caller caller; - - protected Method methodA; - protected Object[] argsA; - - @Before - public final void callerSetUp() throws Exception { - caller = createMock(Caller.class); - methodA = Object.class.getMethod("toString"); - argsA = new Object[] {}; - } - - protected String call(Caller caller) throws Throwable { - return (String) caller.call(methodA, argsA, null, CONNECT_TIMEOUT); - } - - protected void expectCall(String returnValue) throws Throwable { - expect(caller.call(methodA, argsA, null, CONNECT_TIMEOUT)).andReturn(returnValue); - } - - protected void expectCall(Throwable thrown) throws Throwable { - expect(caller.call(methodA, argsA, null, CONNECT_TIMEOUT)).andThrow(thrown); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/thrift/callers/DeadlineCallerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/thrift/callers/DeadlineCallerTest.java b/commons/src/test/java/org/apache/aurora/common/thrift/callers/DeadlineCallerTest.java deleted file mode 100644 index 6441bd0..0000000 --- a/commons/src/test/java/org/apache/aurora/common/thrift/callers/DeadlineCallerTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import com.google.common.testing.TearDown; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.thrift.TTimeoutException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.junit.Before; -import org.junit.Test; - -import javax.annotation.Nullable; -import java.lang.reflect.Method; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -/** - * TODO(William Farner): Test async. - * - * @author William Farner - */ -public class DeadlineCallerTest extends AbstractCallerTest { - - private static final Amount<Long, Time> DEADLINE = Amount.of(100L, Time.MILLISECONDS); - - private ExecutorService executorService; - - private DeadlineCaller makeDeadline(final boolean shouldTimeOut) { - final CountDownLatch cancelled = new CountDownLatch(1); - if (shouldTimeOut) { - addTearDown(new TearDown() { - @Override public void tearDown() throws Exception { - // This will block forever if cancellation does not occur and interrupt the ~indefinite - // sleep. - cancelled.await(); - } - }); - } - - Caller sleepyCaller = new CallerDecorator(caller, false) { - @Override public Object call(Method method, Object[] args, - @Nullable AsyncMethodCallback callback, - @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable { - - if (shouldTimeOut) { - try { - Thread.sleep(Long.MAX_VALUE); - fail("Expected late work to be cancelled and interrupted"); - } catch (InterruptedException e) { - cancelled.countDown(); - } - } - - return caller.call(method, args, callback, connectTimeoutOverride); - } - }; - - return new DeadlineCaller(sleepyCaller, false, executorService, DEADLINE); - } - - @Before - public void setUp() { - executorService = Executors.newSingleThreadExecutor(); - } - - @Test - public void testSuccess() throws Throwable { - DeadlineCaller deadline = makeDeadline(false); - expectCall("foo"); - - control.replay(); - - assertThat(call(deadline), is("foo")); - } - - @Test - public void testException() throws Throwable { - DeadlineCaller deadline = makeDeadline(false); - Throwable exception = new IllegalArgumentException(); - expectCall(exception); - - control.replay(); - - try { - call(deadline); - fail(); - } catch (Throwable t) { - assertThat(t, is(exception)); - } - } - - @Test(expected = TTimeoutException.class) - public void testExceedsDeadline() throws Throwable { - DeadlineCaller deadline = makeDeadline(true); - - // No call expected, since we time out before it can be made. - - control.replay(); - - call(deadline); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/thrift/callers/RetryingCallerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/thrift/callers/RetryingCallerTest.java b/commons/src/test/java/org/apache/aurora/common/thrift/callers/RetryingCallerTest.java deleted file mode 100644 index afe4dc9..0000000 --- a/commons/src/test/java/org/apache/aurora/common/thrift/callers/RetryingCallerTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import java.lang.reflect.Method; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableSet; - -import org.junit.Before; -import org.junit.Test; - -import org.apache.aurora.common.stats.StatsProvider; - -import static org.easymock.EasyMock.expect; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -/** - * TODO(William Farner): Test async. - * - * @author William Farner - */ -public class RetryingCallerTest extends AbstractCallerTest { - - private static final int NUM_RETRIES = 2; - - private static final ImmutableSet<Class<? extends Exception>> NO_RETRYABLE = - ImmutableSet.of(); - private static final ImmutableSet<Class<? extends Exception>> RETRYABLE = - ImmutableSet.<Class<? extends Exception>>of(IllegalArgumentException.class); - - private StatsProvider statsProvider; - - @Before - public void mySetUp() { - statsProvider = createMock(StatsProvider.class); - } - - @Test - public void testSuccess() throws Throwable { - expectCall("foo"); - - control.replay(); - - RetryingCaller retry = makeRetry(false, NO_RETRYABLE); - assertThat(call(retry), is("foo")); - assertThat(memoizeGetCounter.get(methodA).get(), is(0L)); - } - - @Test - public void testException() throws Throwable { - Throwable exception = nonRetryable(); - expectCall(exception); - - control.replay(); - - RetryingCaller retry = makeRetry(false, NO_RETRYABLE); - try { - call(retry); - fail(); - } catch (Throwable t) { - assertThat(t, is(exception)); - } - assertThat(memoizeGetCounter.get(methodA).get(), is(0L)); - } - - @Test - public void testRetriesSuccess() throws Throwable { - expectCall(retryable()); - expectCall(retryable()); - expectCall("foo"); - - control.replay(); - - RetryingCaller retry = makeRetry(false, RETRYABLE); - assertThat(call(retry), is("foo")); - assertThat(memoizeGetCounter.get(methodA).get(), is((long) NUM_RETRIES)); - } - - @Test - public void testRetryLimit() throws Throwable { - expectCall(retryable()); - expectCall(retryable()); - Throwable exception = retryable(); - expectCall(exception); - - control.replay(); - - RetryingCaller retry = makeRetry(false, RETRYABLE); - try { - call(retry); - fail(); - } catch (Throwable t) { - assertThat(t, is(exception)); - } - assertThat(memoizeGetCounter.get(methodA).get(), is(2L)); - } - - private Throwable retryable() { - return new IllegalArgumentException(); - } - - private Throwable nonRetryable() { - return new NullPointerException(); - } - - private LoadingCache<Method, AtomicLong> memoizeGetCounter = CacheBuilder.newBuilder().build( - new CacheLoader<Method, AtomicLong>() { - @Override public AtomicLong load(Method method) { - AtomicLong atomicLong = new AtomicLong(); - expect(statsProvider.makeCounter("test_" + method.getName() + "_retries")) - .andReturn(atomicLong); - return atomicLong; - } - }); - - @Override - protected void expectCall(String returnValue) throws Throwable { - super.expectCall(returnValue); - memoizeGetCounter.get(methodA); - } - - @Override - protected void expectCall(Throwable thrown) throws Throwable { - super.expectCall(thrown); - memoizeGetCounter.get(methodA); - } - - private RetryingCaller makeRetry(boolean async, - ImmutableSet<Class<? extends Exception>> retryableExceptions) { - return new RetryingCaller(caller, async, statsProvider, "test", NUM_RETRIES, - retryableExceptions, false); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/util/LowResClockTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/util/LowResClockTest.java b/commons/src/test/java/org/apache/aurora/common/util/LowResClockTest.java deleted file mode 100644 index f564235..0000000 --- a/commons/src/test/java/org/apache/aurora/common/util/LowResClockTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.aurora.common.util.testing.FakeClock; -import org.easymock.Capture; -import org.easymock.IAnswer; -import org.junit.Test; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -import static org.easymock.EasyMock.anyBoolean; -import static org.easymock.EasyMock.anyLong; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.captureLong; -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.replay; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class LowResClockTest { - - /** - * A FakeClock that overrides the {@link FakeClock#advance(Amount) advance} method to allow a - * co-operating thread to execute a synchronous action via {@link #doOnAdvance(Command)}. - */ - static class WaitingFakeClock extends FakeClock { - private final SynchronousQueue<CountDownLatch> signalQueue = - new SynchronousQueue<CountDownLatch>(); - - @Override - public void advance(Amount<Long, Time> period) { - super.advance(period); - CountDownLatch signal = new CountDownLatch(1); - try { - signalQueue.put(signal); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - try { - signal.await(); - } catch (InterruptedException e) { - // ignore - } - } - - void doOnAdvance(Command action) throws InterruptedException { - CountDownLatch signal = signalQueue.take(); - action.execute(); - signal.countDown(); - } - } - - static class Tick implements Command { - private final Clock clock; - private final long period; - private final Runnable advancer; - private long time; - - Tick(Clock clock, long startTime, long period, Runnable advancer) { - this.clock = clock; - time = startTime; - this.period = period; - this.advancer = advancer; - } - - @Override - public void execute() { - if (clock.nowMillis() >= time + period) { - advancer.run(); - time = clock.nowMillis(); - } - } - } - - @Test - public void testLowResClock() { - final WaitingFakeClock clock = new WaitingFakeClock(); - final long start = clock.nowMillis(); - - ScheduledExecutorService mockExecutor = createMock(ScheduledExecutorService.class); - final Capture<Runnable> runnable = new Capture<Runnable>(); - final Capture<Long> period = new Capture<Long>(); - mockExecutor.scheduleAtFixedRate(capture(runnable), anyLong(), captureLong(period), - eq(TimeUnit.MILLISECONDS)); - - expectLastCall().andAnswer(new IAnswer<ScheduledFuture<?>>() { - public ScheduledFuture<?> answer() { - final Thread ticker = new Thread() { - @Override - public void run() { - Tick tick = new Tick(clock, start, period.getValue(), runnable.getValue()); - try { - while (true) { - clock.doOnAdvance(tick); - } - } catch (InterruptedException e) { - /* terminate */ - } - } - }; - ticker.setDaemon(true); - ticker.start(); - final ScheduledFuture<?> future = createMock(ScheduledFuture.class); - final AtomicBoolean stopped = new AtomicBoolean(false); - expect(future.isCancelled()).andAnswer(new IAnswer<Boolean>() { - @Override - public Boolean answer() throws Throwable { - return stopped.get(); - } - }).anyTimes(); - expect(future.cancel(anyBoolean())).andAnswer(new IAnswer<Boolean>() { - @Override - public Boolean answer() throws Throwable { - ticker.interrupt(); - stopped.set(true); - return true; - } - }); - replay(future); - return future; - } - }); - replay(mockExecutor); - - LowResClock lowRes = new LowResClock(Amount.of(1L, Time.SECONDS), mockExecutor, clock); - - long t = lowRes.nowMillis(); - clock.advance(Amount.of(100L, Time.MILLISECONDS)); - assertEquals(t, lowRes.nowMillis()); - - clock.advance(Amount.of(900L, Time.MILLISECONDS)); - assertEquals(t + 1000, lowRes.nowMillis()); - - clock.advance(Amount.of(100L, Time.MILLISECONDS)); - assertEquals(t + 1000, lowRes.nowMillis()); - - lowRes.close(); - try { - lowRes.nowMillis(); - fail("Closed clock should throw exception!"); - } catch (IllegalStateException e) { - /* expected */ - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/util/QueueDrainerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/util/QueueDrainerTest.java b/commons/src/test/java/org/apache/aurora/common/util/QueueDrainerTest.java deleted file mode 100644 index 66082b8..0000000 --- a/commons/src/test/java/org/apache/aurora/common/util/QueueDrainerTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; - -import org.junit.Before; -import org.junit.Test; - -import org.apache.aurora.common.testing.easymock.EasyMockTest; - -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; - -/** - * @author Srinivasan Rajagopal - */ -public class QueueDrainerTest extends EasyMockTest { - private Executor taskExecutor; - private BlockingQueue<RetryingRunnable> blockingQueue; - private QueueDrainer queueDrainer; - - @Before - public void setUp() { - taskExecutor = createMock(Executor.class); - blockingQueue = createMock(new Clazz<BlockingQueue<RetryingRunnable>>() { }); - queueDrainer = new QueueDrainer<RetryingRunnable>(taskExecutor, blockingQueue); - } - - @Test - public void testDrainsQueue() throws Exception { - RetryingRunnable task = createMock(RetryingRunnable.class); - expect(blockingQueue.poll()).andReturn(task); - taskExecutor.execute(task); - control.replay(); - replay(); - queueDrainer.run(); - } - - @Test - public void testEmptyQueue() throws Exception { - expect(blockingQueue.poll()).andReturn(null); - control.replay(); - replay(); - queueDrainer.run(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/util/RateLimitedCommandExecutorTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/util/RateLimitedCommandExecutorTest.java b/commons/src/test/java/org/apache/aurora/common/util/RateLimitedCommandExecutorTest.java deleted file mode 100644 index 245e07d..0000000 --- a/commons/src/test/java/org/apache/aurora/common/util/RateLimitedCommandExecutorTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.easymock.Capture; -import org.junit.Before; -import org.junit.Test; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.testing.easymock.EasyMockTest; - -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expectLastCall; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * @author Srinivasan Rajagopal - */ -public class RateLimitedCommandExecutorTest extends EasyMockTest { - private ScheduledExecutorService taskExecutor; - private Amount<Long, Time> intervalBetweenRequests; - private RateLimitedCommandExecutor rateLimiter; - private Command command; - private BlockingQueue<RetryingRunnable<?>> queue; - private Runnable queueDrainer; - - @Before - public void setUp() throws Exception { - command = createMock(Command.class); - taskExecutor = createMock(ScheduledExecutorService.class); - queue = createMock(new Clazz<BlockingQueue<RetryingRunnable<?>>>() {}); - queueDrainer = createMock(Runnable.class); - intervalBetweenRequests = Amount.of(100L, Time.MILLISECONDS); - } - - private RateLimitedCommandExecutor createLimiter() { - return new RateLimitedCommandExecutor( - taskExecutor, - intervalBetweenRequests, - queueDrainer, - queue); - } - - @Test - public void testFixedRateClientDequeueIsInvoked() throws Exception { - Capture<Runnable> runnableCapture = createCapture(); - expect(taskExecutor.scheduleWithFixedDelay( - capture(runnableCapture), - eq(0L), - eq((long) intervalBetweenRequests.as(Time.MILLISECONDS)), - eq(TimeUnit.MILLISECONDS))).andReturn(null); - control.replay(); - - rateLimiter = createLimiter(); - assertTrue(runnableCapture.hasCaptured()); - assertNotNull(runnableCapture.getValue()); - } - - @Test - public void testEnqueue() throws Exception { - expect(taskExecutor.scheduleWithFixedDelay((Runnable) anyObject(), - eq(0L), - eq((long) intervalBetweenRequests.as(Time.MILLISECONDS)), - eq(TimeUnit.MILLISECONDS))).andReturn(null); - - Capture<RetryingRunnable> runnableTaskCapture = createCapture(); - expect(queue.add(capture(runnableTaskCapture))).andReturn(true); - control.replay(); - - rateLimiter = createLimiter(); - rateLimiter.execute("name", command, RuntimeException.class, 1, intervalBetweenRequests); - assertTrue(runnableTaskCapture.hasCaptured()); - } - - @Test - public void testDrainQueueCommandHandlesException() { - Capture<Runnable> runnableCapture = createCapture(); - expect(taskExecutor.scheduleWithFixedDelay( - capture(runnableCapture), - eq(0L), - eq((long) intervalBetweenRequests.as(Time.MILLISECONDS)), - eq(TimeUnit.MILLISECONDS))).andReturn(null); - queueDrainer.run(); - expectLastCall().andThrow(new RuntimeException()); - - control.replay(); - rateLimiter = createLimiter(); - - //Execute the runnable to ensure the exception does not propagate - // and potentially kill threads in the executor service. - runnableCapture.getValue().run(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/util/caching/CachingMethodProxyTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/util/caching/CachingMethodProxyTest.java b/commons/src/test/java/org/apache/aurora/common/util/caching/CachingMethodProxyTest.java deleted file mode 100644 index 4c3b248..0000000 --- a/commons/src/test/java/org/apache/aurora/common/util/caching/CachingMethodProxyTest.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.caching; - -import com.google.common.base.Predicate; -import org.easymock.IMocksControl; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.List; - -import static org.easymock.EasyMock.createControl; -import static org.easymock.EasyMock.expect; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; - -/** - * @author William Farner - */ -public class CachingMethodProxyTest { - - private CachingMethodProxy<Math> proxyBuilder; - private Math uncachedMath; - private Math cachedMath; - private Cache<List, Integer> intCache; - private Predicate<Integer> intFilter; - - private IMocksControl control; - - @Before - @SuppressWarnings("unchecked") - public void setUp() { - control = createControl(); - uncachedMath = control.createMock(Math.class); - intCache = control.createMock(Cache.class); - intFilter = control.createMock(Predicate.class); - - proxyBuilder = CachingMethodProxy.proxyFor(uncachedMath, Math.class); - cachedMath = proxyBuilder.getCachingProxy(); - } - - @After - public void verifyControl() { - control.verify(); - } - - @Test - public void testCaches() throws Exception { - expectUncachedAdd(1, 2, true); - expectUncachedAdd(3, 4, true); - expect(intCache.get(Arrays.asList(1, 2))).andReturn(3); - expect(intCache.get(Arrays.asList(3, 4))).andReturn(7); - - control.replay(); - - proxyBuilder.cache(cachedMath.sum(0, 0), intCache, intFilter) - .prepare(); - assertThat(cachedMath.sum(1, 2), is(3)); - assertThat(cachedMath.sum(3, 4), is(7)); - assertThat(cachedMath.sum(1, 2), is(3)); - assertThat(cachedMath.sum(3, 4), is(7)); - } - - @Test - public void testIgnoresUncachedMethod() throws Exception { - expect(uncachedMath.sub(2, 1)).andReturn(1); - expect(uncachedMath.sub(2, 1)).andReturn(1); - - control.replay(); - - proxyBuilder.cache(cachedMath.sum(0, 0), intCache, intFilter) - .prepare(); - assertThat(cachedMath.sub(2, 1), is(1)); - assertThat(cachedMath.sub(2, 1), is(1)); - } - - @Test - public void testFilterValue() throws Exception { - expectUncachedAdd(1, 2, true); - expectUncachedAdd(3, 4, false); - expect(intCache.get(Arrays.asList(1, 2))).andReturn(3); - - control.replay(); - - proxyBuilder.cache(cachedMath.sum(0, 0), intCache, intFilter) - .prepare(); - assertThat(cachedMath.sum(1, 2), is(3)); - assertThat(cachedMath.sum(3, 4), is(7)); - assertThat(cachedMath.sum(1, 2), is(3)); - } - - @Test(expected = IllegalStateException.class) - public void testRequiresOneCache() throws Exception { - control.replay(); - - proxyBuilder.prepare(); - } - - @Test - public void testExceptionThrown() throws Exception { - List<Integer> args = Arrays.asList(1, 2); - expect(intCache.get(args)).andReturn(null); - - Math.AddException thrown = new Math.AddException(); - expect(uncachedMath.sum(1, 2)).andThrow(thrown); - - control.replay(); - - proxyBuilder.cache(cachedMath.sum(0, 0), intCache, intFilter) - .prepare(); - try { - cachedMath.sum(1, 2); - } catch (Math.AddException e) { - assertSame(e, thrown); - } - } - - /* TODO(William Farner): Re-enable once the TODO for checking return value/cache value types is done. - @Test(expected = IllegalArgumentException.class) - public void testCacheValueAndMethodReturnTypeMismatch() throws Exception { - control.replay(); - - cachedMath.addDouble(0, 0); - proxyBuilder.cache(1, intCache, intFilter) - .prepare(); - } - */ - - @Test(expected = IllegalStateException.class) - public void testRejectsCacheSetupAfterPrepare() throws Exception { - control.replay(); - - proxyBuilder.cache(cachedMath.sum(0, 0), intCache, intFilter) - .prepare(); - proxyBuilder.cache(null, intCache, intFilter); - } - - @Test - @SuppressWarnings("unchecked") - public void testIgnoresNullValues() throws Exception { - // Null return values should not even be considered for entry into the cache, and therefore - // should not be passed to the filter. - - Cache<List, Math> crazyCache = control.createMock(Cache.class); - Predicate<Math> crazyFilter = control.createMock(Predicate.class); - - expect(crazyCache.get(Arrays.asList(null, null))).andReturn(null); - expect(uncachedMath.crazyMath(null, null)).andReturn(null); - - control.replay(); - - proxyBuilder.cache(cachedMath.crazyMath(null, null), crazyCache, crazyFilter) - .prepare(); - - cachedMath.crazyMath(null, null); - } - - @Test(expected = IllegalArgumentException.class) - @SuppressWarnings("unchecked") - public void testRejectsVoidReturn() throws Exception { - Cache<List, Void> voidCache = control.createMock(Cache.class); - Predicate<Void> voidFilter = control.createMock(Predicate.class); - - control.replay(); - - cachedMath.doSomething(null); - proxyBuilder.cache(null, voidCache, voidFilter); - } - - @Test(expected = IllegalStateException.class) - @SuppressWarnings("unchecked") - public void testFailsNoCachedCall() throws Exception { - Cache<List, Void> voidCache = control.createMock(Cache.class); - Predicate<Void> voidFilter = control.createMock(Predicate.class); - - control.replay(); - - // No method call was recorded on the proxy, so the builder doesn't know what to cache. - proxyBuilder.cache(null, voidCache, voidFilter); - } - - @Test(expected = IllegalArgumentException.class) - @SuppressWarnings("unchecked") - public void testRejectsZeroArgMethods() throws Exception { - Cache<List, Math> mathCache = control.createMock(Cache.class); - Predicate<Math> mathFilter = control.createMock(Predicate.class); - - control.replay(); - - proxyBuilder.cache(cachedMath.doNothing(), mathCache, mathFilter); - } - - @Test - public void testAllowsSuperclassMethod() throws Exception { - SubMath subMath = control.createMock(SubMath.class); - - List<Integer> args = Arrays.asList(1, 2); - expect(intCache.get(args)).andReturn(null); - expect(subMath.sum(1, 2)).andReturn(3); - expect(intFilter.apply(3)).andReturn(true); - intCache.put(args, 3); - - control.replay(); - - Method add = SubMath.class.getMethod("sum", int.class, int.class); - - CachingMethodProxy<SubMath> proxyBuilder = CachingMethodProxy.proxyFor(subMath, SubMath.class); - SubMath cached = proxyBuilder.getCachingProxy(); - proxyBuilder.cache(cached.sum(0, 0), intCache, intFilter) - .prepare(); - - cached.sum(1, 2); - } - - private void expectUncachedAdd(int a, int b, boolean addToCache) throws Math.AddException { - List<Integer> args = Arrays.asList(a, b); - expect(intCache.get(args)).andReturn(null); - expect(uncachedMath.sum(a, b)).andReturn(a + b); - expect(intFilter.apply(a + b)).andReturn(addToCache); - if (addToCache) intCache.put(args, a + b); - } - - private interface Math { - public int sum(int a, int b) throws AddException; - - public double addDouble(double a, double b) throws AddException; - - public int sub(int a, int b); - - public Math crazyMath(Math a, Math b); - - public Math doNothing(); - - public void doSomething(Math a); - - class AddException extends Exception {} - } - - private interface SubMath extends Math { - public int otherSum(int a, int b); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/util/caching/LRUCacheTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/util/caching/LRUCacheTest.java b/commons/src/test/java/org/apache/aurora/common/util/caching/LRUCacheTest.java deleted file mode 100644 index e1a51d2..0000000 --- a/commons/src/test/java/org/apache/aurora/common/util/caching/LRUCacheTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.caching; - -import com.google.common.collect.Lists; -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.collections.Pair; -import org.junit.Test; - -import java.util.Arrays; -import java.util.List; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; - -/** - * Tests the LRUCache class. - * - * @author William Farner - */ -public class LRUCacheTest { - - @Test - public void testEvicts() { - int cacheSize = 10; - int inserts = 100; - LRUCache<Integer, Integer> cache = LRUCache.<Integer, Integer>builder() - .maxSize(cacheSize) - .build(); - for (int i = 0; i < inserts; i++) { - cache.put(i, i); - assertThat(cache.size(), is(Math.min(i + 1, cacheSize))); - } - } - - @Test - public void testEvictsLRU() { - int cacheSize = 10; - - final List<Integer> evictedKeys = Lists.newLinkedList(); - - Closure<Pair<Integer, Integer>> listener = new Closure<Pair<Integer, Integer>>() { - @Override public void execute(Pair<Integer, Integer> evicted) { - evictedKeys.add(evicted.getFirst()); - } - }; - - LRUCache<Integer, Integer> cache = LRUCache.<Integer, Integer>builder() - .maxSize(cacheSize) - .evictionListener(listener) - .build(); - - for (int i = 0; i < cacheSize; i++) { - cache.put(i, i); - } - - // Access all elements except '3'. - for (int access : Arrays.asList(0, 7, 2, 8, 4, 6, 9, 1, 5)) { - assertNotNull(cache.get(access)); - } - - assertThat(evictedKeys.isEmpty(), is(true)); - - // This should trigger the eviction. - cache.put(cacheSize + 1, cacheSize + 1); - - assertThat(evictedKeys, is(Arrays.asList(3))); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/zookeeper/CompoundServerSetTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CompoundServerSetTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CompoundServerSetTest.java deleted file mode 100644 index a06a814..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/CompoundServerSetTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; -import org.apache.aurora.common.net.pool.DynamicHostSet.MonitorException; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.thrift.ServiceInstance; - -import static org.easymock.EasyMock.createControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.getCurrentArguments; - -public class CompoundServerSetTest extends EasyMockTest { - private static final Map<String, InetSocketAddress> AUX_PORTS = ImmutableMap.of(); - private static final InetSocketAddress END_POINT = - InetSocketAddress.createUnresolved("foo", 12345); - - private ServerSet.EndpointStatus mockStatus1; - private ServerSet.EndpointStatus mockStatus2; - private ServerSet.EndpointStatus mockStatus3; - private HostChangeMonitor<ServiceInstance> compoundMonitor; - - private ServerSet serverSet1; - private ServerSet serverSet2; - private ServerSet serverSet3; - private Command stop1; - private Command stop2; - private Command stop3; - private CompoundServerSet compoundServerSet; - - private ServiceInstance instance1; - private ServiceInstance instance2; - private ServiceInstance instance3; - - private void triggerChange(ServiceInstance... hostChanges) { - compoundMonitor.onChange(ImmutableSet.copyOf(hostChanges)); - } - - private void triggerChange( - Capture<HostChangeMonitor<ServiceInstance>> capture, - ServiceInstance... hostChanges) { - - capture.getValue().onChange(ImmutableSet.copyOf(hostChanges)); - } - - @Before - public void setUpMocks() throws Exception { - control = createControl(); - compoundMonitor = createMock(new Clazz<HostChangeMonitor<ServiceInstance>>() { }); - - mockStatus1 = createMock(ServerSet.EndpointStatus.class); - mockStatus2 = createMock(ServerSet.EndpointStatus.class); - mockStatus3 = createMock(ServerSet.EndpointStatus.class); - - serverSet1 = createMock(ServerSet.class); - serverSet2 = createMock(ServerSet.class); - serverSet3 = createMock(ServerSet.class); - - stop1 = createMock(Command.class); - stop2 = createMock(Command.class); - stop3 = createMock(Command.class); - - instance1 = createMock(ServiceInstance.class); - instance2 = createMock(ServiceInstance.class); - instance3 = createMock(ServiceInstance.class); - - compoundServerSet = new CompoundServerSet(ImmutableList.of(serverSet1, serverSet2, serverSet3)); - } - - @Test - public void testJoin() throws Exception { - expect(serverSet1.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus1); - expect(serverSet2.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus2); - expect(serverSet3.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus3); - - mockStatus1.leave(); - mockStatus2.leave(); - mockStatus3.leave(); - - control.replay(); - - compoundServerSet.join(END_POINT, AUX_PORTS, 0).leave(); - } - - @Test(expected = Group.JoinException.class) - public void testJoinFailure() throws Exception { - // Throw exception for the first serverSet join. - expect(serverSet1.join(END_POINT, AUX_PORTS)) - .andThrow(new Group.JoinException("Group join exception", null)); - - control.replay(); - compoundServerSet.join(END_POINT, AUX_PORTS); - } - - @Test(expected = ServerSet.UpdateException.class) - public void testStatusUpdateFailure() throws Exception { - expect(serverSet1.join(END_POINT, AUX_PORTS)).andReturn(mockStatus1); - expect(serverSet2.join(END_POINT, AUX_PORTS)).andReturn(mockStatus2); - expect(serverSet3.join(END_POINT, AUX_PORTS)).andReturn(mockStatus3); - - mockStatus1.leave(); - mockStatus2.leave(); - expectLastCall().andThrow(new ServerSet.UpdateException("Update exception")); - mockStatus3.leave(); - - control.replay(); - - compoundServerSet.join(END_POINT, AUX_PORTS).leave(); - } - - @Test - public void testMonitor() throws Exception { - Capture<HostChangeMonitor<ServiceInstance>> set1Capture = createCapture(); - Capture<HostChangeMonitor<ServiceInstance>> set2Capture = createCapture(); - Capture<HostChangeMonitor<ServiceInstance>> set3Capture = createCapture(); - - expect(serverSet1.watch( - EasyMock.<HostChangeMonitor<ServiceInstance>>capture(set1Capture))) - .andReturn(stop1); - expect(serverSet2.watch( - EasyMock.<HostChangeMonitor<ServiceInstance>>capture(set2Capture))) - .andReturn(stop2); - expect(serverSet3.watch( - EasyMock.<HostChangeMonitor<ServiceInstance>>capture(set3Capture))) - .andReturn(stop3); - - triggerChange(instance1); - triggerChange(instance1, instance2); - triggerChange(instance1, instance2, instance3); - triggerChange(instance1, instance3); - triggerChange(instance1, instance2, instance3); - triggerChange(instance3); - triggerChange(); - - control.replay(); - compoundServerSet.watch(compoundMonitor); - - // No new instances. - triggerChange(set1Capture); - triggerChange(set2Capture); - triggerChange(set3Capture); - // Add one instance from each serverset - triggerChange(set1Capture, instance1); - triggerChange(set2Capture, instance2); - triggerChange(set3Capture, instance3); - // Remove instance2 - triggerChange(set2Capture); - // instance1 in both serverset1 and serverset2 - triggerChange(set2Capture, instance1, instance2); - // Remove instances from serversets. - triggerChange(set1Capture); - triggerChange(set2Capture); - triggerChange(set3Capture); - } - - @Test(expected = MonitorException.class) - public void testMonitorFailure() throws Exception { - serverSet1.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject()); - expectLastCall().andThrow(new MonitorException("Monitor exception", null)); - - control.replay(); - compoundServerSet.watch(compoundMonitor); - } - - @Test - public void testInitialChange() throws Exception { - // Ensures that a synchronous change notification during the call to monitor() is properly - // reported. - serverSet1.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject()); - expectLastCall().andAnswer(new IAnswer<Command>() { - @Override public Command answer() { - @SuppressWarnings("unchecked") - HostChangeMonitor<ServiceInstance> monitor = - (HostChangeMonitor<ServiceInstance>) getCurrentArguments()[0]; - monitor.onChange(ImmutableSet.of(instance1, instance2)); - return stop1; - } - }); - compoundMonitor.onChange(ImmutableSet.of(instance1, instance2)); - expect(serverSet2.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject())) - .andReturn(stop2); - expect(serverSet3.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject())) - .andReturn(stop3); - - control.replay(); - - compoundServerSet.watch(compoundMonitor); - } - - @Test - public void testStopMonitoring() throws Exception { - expect(serverSet1.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject())) - .andReturn(stop1); - expect(serverSet2.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject())) - .andReturn(stop2); - expect(serverSet3.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject())) - .andReturn(stop3); - - stop1.execute(); - stop2.execute(); - stop3.execute(); - - control.replay(); - compoundServerSet.watch(compoundMonitor).execute(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/test/java/org/apache/aurora/common/zookeeper/DistributedLockTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/DistributedLockTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/DistributedLockTest.java deleted file mode 100644 index 00a6648..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/DistributedLockTest.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; - -/** - * @author Florian Leibert - */ -public class DistributedLockTest extends BaseZooKeeperTest { - private static final String LOCK_PATH = "/test/lock"; - - private ZooKeeperClient zkClient; - - @Before - public void mySetUp() throws Exception { - zkClient = createZkClient(); - - } - - @Test - public void testFailDoubleLock() { - DistributedLock lock = new DistributedLockImpl(zkClient, LOCK_PATH); - lock.lock(); - try { - lock.lock(); - fail("Exception expected!"); - } catch (DistributedLockImpl.LockingException e) { - // expected - } finally { - lock.unlock(); - } - } - - @Test - public void testFailUnlock() { - DistributedLock lock = new DistributedLockImpl(zkClient, LOCK_PATH); - try { - lock.unlock(); - fail("Expected exception while trying to unlock!"); - } catch (DistributedLockImpl.LockingException e) { - // success - } - } - - @Test - public void testTwoLocks() { - DistributedLock lock1 = new DistributedLockImpl(zkClient, LOCK_PATH); - DistributedLock lock2 = new DistributedLockImpl(zkClient, LOCK_PATH); - lock1.lock(); - List<String> children = expectZkNodes(LOCK_PATH); - assertEquals("One child == lock held!", children.size(), 1); - lock1.unlock(); - // check no locks held/empty children - children = expectZkNodes(LOCK_PATH); - assertEquals("No children, no lock held!", children.size(), 0); - lock2.lock(); - children = expectZkNodes(LOCK_PATH); - assertEquals("One child == lock held!", children.size(), 1); - lock2.unlock(); - } - - @Test - public void testTwoLocksFailFast() { - DistributedLock lock1 = new DistributedLockImpl(zkClient, LOCK_PATH); - DistributedLock lock2 = new DistributedLockImpl(zkClient, LOCK_PATH); - lock1.lock(); - boolean acquired = lock2.tryLock(10, TimeUnit.MILLISECONDS); - assertFalse("Couldn't acquire lock because it's currently held", acquired); - lock1.unlock(); - lock2.unlock(); - } - - @Test - @Ignore("pending: <http://jira.local.twitter.com/browse/RESEARCH-49>") - public void testMultiConcurrentLocking() throws Exception { - //TODO(Florian Leibert): this is a bit janky, so let's replace it. - for (int i = 0; i < 50; i++) { - testConcurrentLocking(); - } - mySetUp(); - } - - @Test - public void testConcurrentLocking() throws Exception { - ZooKeeperClient zk1 = createZkClient(); - ZooKeeperClient zk2 = createZkClient(); - ZooKeeperClient zk3 = createZkClient(); - - final DistributedLock lock1 = new DistributedLockImpl(zk1, LOCK_PATH); - final DistributedLock lock2 = new DistributedLockImpl(zk2, LOCK_PATH); - final DistributedLock lock3 = new DistributedLockImpl(zk3, LOCK_PATH); - Callable<Object> t1 = new Callable<Object>() { - @Override - public Object call() throws InterruptedException { - lock1.lock(); - try { - Thread.sleep(50); - } finally { - lock1.unlock(); - } - return new Object(); - } - }; - - Callable<Object> t2 = new Callable<Object>() { - @Override - public Object call() throws InterruptedException { - lock2.lock(); - try { - Thread.sleep(50); - } finally { - lock2.unlock(); - } - return new Object(); - } - }; - - Callable<Object> t3 = new Callable<Object>() { - @Override - public Object call() throws InterruptedException { - lock3.lock(); - try { - Thread.sleep(50); - } finally { - lock3.unlock(); - } - return new Object(); - } - }; - - //TODO(Florian Leibert): remove this executors stuff and use a latch instead. - ExecutorService ex = Executors.newCachedThreadPool(); - @SuppressWarnings("unchecked") List<Callable<Object>> tlist = Arrays.asList(t1, t2, t3); - ex.invokeAll(tlist); - assertTrue("No Children left!", expectZkNodes(LOCK_PATH).size() == 0); - } - - protected List<String> expectZkNodes(String path) { - try { - List<String> children = zkClient.get().getChildren(path, null); - return children; - } catch (Exception e) { - throw new RuntimeException(e); - } - } -}
