Removed queue from NamedTask Executor and made it do one event at a time. Added Rejected tests to Entity Events.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a49b4449 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a49b4449 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a49b4449 Branch: refs/heads/UG-rest-test-framework-overhaul Commit: a49b444961c0fc498b2bee0f396c8ca56b199a31 Parents: e9f876c Author: grey <[email protected]> Authored: Tue Nov 4 12:12:24 2014 -0800 Committer: grey <[email protected]> Committed: Tue Nov 4 12:12:24 2014 -0800 ---------------------------------------------------------------------- .../impl/EntityVersionCleanupTaskTest.java | 129 ++++++------------- .../impl/EntityVersionCreatedTaskTest.java | 46 +++++++ .../core/task/NamedTaskExecutorImpl.java | 114 +++++++++++++++- .../core/task/NamedTaskExecutorImplTest.java | 32 +++++ 4 files changed, 228 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java index 621db20..0e86027 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java @@ -639,168 +639,123 @@ public class EntityVersionCleanupTaskTest { latch.await(); } - /** * Tests that our task will run in the caller if there's no threads, ensures that the task runs */ - @Ignore("Test is a work in progress") @Test(timeout=10000) - public void runsWhenRejected() + public void singleListenerSingleVersionRejected() throws ExecutionException, InterruptedException, ConnectionException { - /** - * only 1 thread on purpose, we want to saturate the task - */ - final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0 ); + final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 ); final SerializationFig serializationFig = mock( SerializationFig.class ); when( serializationFig.getBufferSize() ).thenReturn( 10 ); - final MvccEntitySerializationStrategy mvccEntitySerializationStrategy = + final MvccEntitySerializationStrategy ess = mock( MvccEntitySerializationStrategy.class ); final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy = mock( MvccLogEntrySerializationStrategy.class ); - final Keyspace keyspace1 = mock( Keyspace.class ); - final Keyspace keyspace2 = mock( Keyspace.class ); + final Keyspace keyspace = mock( Keyspace.class ); final MutationBatch entityBatch = mock( MutationBatch.class ); final MutationBatch logBatch = mock( MutationBatch.class ); - when( keyspace1.prepareMutationBatch() ) - .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one - .thenReturn( entityBatch ) - .thenReturn( logBatch ); + when( keyspace.prepareMutationBatch() ) + .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one + .thenReturn( entityBatch ) + .thenReturn( logBatch ); - when( keyspace2.prepareMutationBatch() ) - .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one - .thenReturn( entityBatch ) - .thenReturn( logBatch ); //create a latch for the event listener, and add it to the list of events - final int sizeToReturn = 10; - - - final int listenerCount = 2; + final int sizeToReturn = 1; - final CountDownLatch latch = new CountDownLatch( - sizeToReturn/serializationFig.getBufferSize() * listenerCount ); - final Semaphore waitSemaphore = new Semaphore( 0 ); + final CountDownLatch latch = new CountDownLatch( sizeToReturn ); + final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch ); - final SlowListener slowListener = new SlowListener( latch, waitSemaphore ); - final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch ); + final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>(); + listeners.add( eventListener ); final Id applicationId = new SimpleId( "application" ); - final CollectionScope appScope = new CollectionScopeImpl( + final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" ); final Id entityId = new SimpleId( "user" ); //mock up a single log entry for our first test - final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( + final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 ); final UUID version = logEntryMock.getEntries().iterator().next().getVersion(); + final UniqueValueSerializationStrategy uniqueValueSerializationStrategy = mock( UniqueValueSerializationStrategy.class ); - - EntityVersionCleanupTask firstTask = new EntityVersionCleanupTask( - serializationFig, - mvccLogEntrySerializationStrategy, - mvccEntitySerializationStrategy, - uniqueValueSerializationStrategy, - keyspace1, - Sets.newSet( (EntityVersionDeleted)runListener ), - appScope, - entityId, - version ); - - - //change the listeners to one that is just invoked quickly - - - EntityVersionCleanupTask secondTask = new EntityVersionCleanupTask( - serializationFig, - mvccLogEntrySerializationStrategy, - mvccEntitySerializationStrategy, - uniqueValueSerializationStrategy, - keyspace2, - Sets.newSet( (EntityVersionDeleted)runListener ), - appScope, - entityId, - version ); - + EntityVersionCleanupTask cleanupTask = + new EntityVersionCleanupTask( serializationFig, + mvccLogEntrySerializationStrategy, + ess, + uniqueValueSerializationStrategy, + keyspace, + listeners, + appScope, + entityId, + version + ); final MutationBatch batch = mock( MutationBatch.class ); //set up returning a mutator - when( mvccEntitySerializationStrategy - .delete( same( appScope ), same( entityId ), any( UUID.class ) ) ) + when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ) .thenReturn( batch ); - when( mvccLogEntrySerializationStrategy .delete( same( appScope ), same( entityId ), any( UUID.class ) ) ) .thenReturn( batch ); - //start the task - ListenableFuture<Void> future1 = taskExecutor.submit( firstTask ); + final List<MvccEntity> mel = new ArrayList<MvccEntity>(); - //now start another task while the slow running task is running - ListenableFuture<Void> future2 = taskExecutor.submit( secondTask ); + mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), + MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) ); - //get the second task, we shouldn't have been able to queue it, - // therefore it should just run in process - future2.get(); + mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), + MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) ); - /** - * While we're not done, release latches every 200 ms - */ - while ( !future1.isDone() ) { - Thread.sleep( 200 ); - waitSemaphore.release( listenerCount ); - } + when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) ) + .thenReturn(mel.iterator() ); - //wait for the task - future1.get(); - //we deleted the version - //verify we deleted everything + //start the task + ListenableFuture<Void> future = taskExecutor.submit( cleanupTask ); + //wait for the task + future.get(); //we deleted the version - //verify we deleted everything - verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) ); - - verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) ); - - - verify( logBatch, times(2) ).execute(); - - verify( entityBatch, times(2) ).execute(); + //verify it was run + verify( entityBatch ).execute(); + verify( logBatch ).execute(); //the latch was executed latch.await(); } - private static class EntityVersionDeletedTest implements EntityVersionDeleted { final CountDownLatch invocationLatch; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java index 9d72665..24ea280 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java @@ -200,6 +200,52 @@ public class EntityVersionCreatedTaskTest { verify( listeners ).iterator(); } + @Test(timeout=10000) + public void oneListenerRejected() + throws ExecutionException, InterruptedException, ConnectionException { + + // create a latch for the event listener, and add it to the list of events + + final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 ); + + final int sizeToReturn = 1; + + final CountDownLatch latch = new CountDownLatch( sizeToReturn ); + + final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch); + + final Set<EntityVersionCreated> listeners = mock( Set.class ); + final Iterator<EntityVersionCreated> helper = mock(Iterator.class); + + when ( listeners.size()).thenReturn( 1 ); + when ( listeners.iterator()).thenReturn( helper ); + when ( helper.next() ).thenReturn( eventListener ); + + final Id applicationId = new SimpleId( "application" ); + + final CollectionScope appScope = new CollectionScopeImpl( + applicationId, applicationId, "users" ); + + final Id entityId = new SimpleId( "user" ); + final Entity entity = new Entity( entityId ); + + // start the task + + EntityVersionCreatedTask entityVersionCreatedTask = + new EntityVersionCreatedTask( appScope, listeners, entity); + + ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask ); + + // wait for the task + future.get(); + + //mocked listener makes sure that the task is called + verify( listeners ).size(); + verify( listeners ).iterator(); + verify( helper ).next(); + + } + private static class EntityVersionCreatedTest implements EntityVersionCreated { final CountDownLatch invocationLatch; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java index b18687a..a022c08 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java @@ -1,16 +1,24 @@ package org.apache.usergrid.persistence.core.task; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -36,7 +44,6 @@ public class NamedTaskExecutorImpl implements TaskExecutor { private final String name; private final int poolSize; - private final int queueLength; /** @@ -47,17 +54,24 @@ public class NamedTaskExecutorImpl implements TaskExecutor { public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) { Preconditions.checkNotNull( name ); Preconditions.checkArgument( name.length() > 0, "name must have a length" ); - Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" ); + Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" ); Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" ); this.name = name; this.poolSize = poolSize; - this.queueLength = queueLength; - final BlockingQueue<Runnable> queue = - queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>(); + //The user has chosen to disable asynchronous execution, to create an executor service that will reject all requests + if(poolSize == 0){ + executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService()); + } + + //queue executions as normal + else { + final BlockingQueue<Runnable> queue = + queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>(); - executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) ); + executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) ); + } } @@ -164,4 +178,92 @@ public class NamedTaskExecutorImpl implements TaskExecutor { } } + + + /** + * Executor implementation that simply rejects all incoming tasks + */ + private static final class RejectingExecutorService implements ExecutorService{ + + @Override + public void shutdown() { + + } + + + @Override + public List<Runnable> shutdownNow() { + return Collections.EMPTY_LIST; + } + + + @Override + public boolean isShutdown() { + return false; + } + + + @Override + public boolean isTerminated() { + return false; + } + + + @Override + public boolean awaitTermination( final long timeout, final TimeUnit unit ) throws InterruptedException { + return false; + } + + + @Override + public <T> Future<T> submit( final Callable<T> task ) { + throw new RejectedExecutionException("No Asynchronous tasks allowed"); + } + + + @Override + public <T> Future<T> submit( final Runnable task, final T result ) { + throw new RejectedExecutionException("No Asynchronous tasks allowed"); + } + + + @Override + public Future<?> submit( final Runnable task ) { + throw new RejectedExecutionException("No Asynchronous tasks allowed"); + } + + + @Override + public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks ) + throws InterruptedException { + throw new RejectedExecutionException("No Asynchronous tasks allowed"); + } + + + @Override + public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks, final long timeout, + final TimeUnit unit ) throws InterruptedException { + throw new RejectedExecutionException("No Asynchronous tasks allowed"); + } + + + @Override + public <T> T invokeAny( final Collection<? extends Callable<T>> tasks ) + throws InterruptedException, ExecutionException { + throw new RejectedExecutionException("No Asynchronous tasks allowed"); + } + + + @Override + public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit ) + throws InterruptedException, ExecutionException, TimeoutException { + throw new RejectedExecutionException("No Asynchronous tasks allowed"); + } + + + @Override + public void execute( final Runnable command ) { + throw new RejectedExecutionException("No Asynchronous tasks allowed"); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java index 34f57e5..65189f1 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java @@ -185,6 +185,38 @@ public class NamedTaskExecutorImplTest { } + @Test + public void rejectingTaskExecutor() throws InterruptedException { + + final int threadPoolSize = 0; + final int queueSize = 0; + + final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize ); + + final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); + final CountDownLatch rejectedLatch = new CountDownLatch( 1 ); + final CountDownLatch runLatch = new CountDownLatch( 0 ); + + + //now submit the second task + + + + final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; + + executor.submit( testTask ); + + + //should be immediately rejected + rejectedLatch.await( 1000, TimeUnit.MILLISECONDS ); + + //if we get here we've been rejected, just double check we didn't run + + assertEquals( 0l, exceptionLatch.getCount() ); + assertEquals( 0l, runLatch.getCount() ); + } + + private static abstract class TestTask<V> implements Task<V> { private final List<Throwable> exceptions;
