http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java new file mode 100644 index 0000000..c1f76f2 --- /dev/null +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java @@ -0,0 +1,712 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.mvcc.stage.delete; + + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; + +import org.junit.AfterClass; +import org.junit.Test; + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; +import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.collection.util.LogEntryMock; +import org.apache.usergrid.persistence.collection.util.UniqueValueEntryMock; +import org.apache.usergrid.persistence.collection.util.VersionGenerator; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; + +import com.google.common.util.concurrent.ListenableFuture; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class UniqueCleanupTest { + +// +// private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 ); +// +// +// @AfterClass +// public static void shutdown() { +// taskExecutor.shutdown(); +// } +// +// +// @Test( timeout = 10000 ) +// public void noListenerOneVersion() throws Exception { +// +// +// final SerializationFig serializationFig = mock( SerializationFig.class ); +// +// when( serializationFig.getBufferSize() ).thenReturn( 10 ); +// +// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); +// +// +// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); +// +// final Keyspace keyspace = mock( Keyspace.class ); +// +// final MutationBatch entityBatch = mock( MutationBatch.class ); +// +// when( keyspace.prepareMutationBatch() ).thenReturn( +// mock( MutationBatch.class ) ) // don't care what happens to this one +// .thenReturn( entityBatch ); +// +// // intentionally no events +// final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>(); +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); +// +// final Id entityId = new SimpleId( "user" ); +// +// final List<UUID> versions = VersionGenerator.generateVersions( 2 ); +// +// // mock up a single log entry for our first test +// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); +// +// +// //get the version we're keeping, it's first in our list +// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); +// +// //mock up unique version output +// final UniqueValueEntryMock uniqueValueEntryMock = +// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); +// +// +// EntityVersionCleanupTask cleanupTask = +// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, +// version, false ); +// +// final MutationBatch newBatch = mock( MutationBatch.class ); +// +// +// // set up returning a mutator +// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); +// +// //return a new batch when it's called +// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); +// +// +// cleanupTask.call(); +// +// +// //get the second field, this should be deleted +// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); +// +// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); +// +// +// //verify delete was invoked +// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); +// +// //verify the delete was invoked +// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); +// +// // verify it was run +// verify( entityBatch ).execute(); +// } +// +// +// /** +// * Tests the cleanup task on the first version created +// */ +// @Test( timeout = 10000 ) +// public void noListenerNoVersions() throws Exception { +// +// +// final SerializationFig serializationFig = mock( SerializationFig.class ); +// +// when( serializationFig.getBufferSize() ).thenReturn( 10 ); +// +// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); +// +// +// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); +// +// final Keyspace keyspace = mock( Keyspace.class ); +// +// final MutationBatch entityBatch = mock( MutationBatch.class ); +// +// when( keyspace.prepareMutationBatch() ).thenReturn( +// mock( MutationBatch.class ) ) // don't care what happens to this one +// .thenReturn( entityBatch ); +// +// // intentionally no events +// final Set<EntityVersionDeleted> listeners = new HashSet<>(); +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); +// +// final Id entityId = new SimpleId( "user" ); +// +// +// final List<UUID> versions = VersionGenerator.generateVersions( 1 ); +// +// // mock up a single log entry, with no other entries +// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); +// +// +// //get the version we're keeping, it's first in our list +// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); +// +// //mock up unique version output +// final UniqueValueEntryMock uniqueValueEntryMock = +// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); +// +// +// EntityVersionCleanupTask cleanupTask = +// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, version, false ); +// +// final MutationBatch newBatch = mock( MutationBatch.class ); +// +// +// // set up returning a mutator +// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); +// +// //return a new batch when it's called +// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); +// +// +// cleanupTask.call(); +// +// +// //verify delete was never invoked +// verify( uvss, never() ).delete( any( ApplicationScope.class ), any( UniqueValue.class ) ); +// +// //verify the delete was never invoked +// verify( less, never() ).delete( any( ApplicationScope.class ), any( Id.class ), any( UUID.class ) ); +// } +// +// +// @Test( timeout = 10000 ) +// public void singleListenerSingleVersion() throws Exception { +// +// +// //create a latch for the event listener, and add it to the list of events +// final int sizeToReturn = 1; +// +// final CountDownLatch latch = new CountDownLatch( sizeToReturn ); +// +// final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch ); +// +// final Set<EntityVersionDeleted> listeners = new HashSet<>(); +// +// listeners.add( eventListener ); +// +// +// final SerializationFig serializationFig = mock( SerializationFig.class ); +// +// when( serializationFig.getBufferSize() ).thenReturn( 10 ); +// +// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); +// +// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); +// +// final Keyspace keyspace = mock( Keyspace.class ); +// +// final MutationBatch entityBatch = mock( MutationBatch.class ); +// +// when( keyspace.prepareMutationBatch() ).thenReturn( +// mock( MutationBatch.class ) ) // don't care what happens to this one +// .thenReturn( entityBatch ); +// +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); +// +// final Id entityId = new SimpleId( "user" ); +// +// +// final List<UUID> versions = VersionGenerator.generateVersions( 2 ); +// +// +// // mock up a single log entry for our first test +// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); +// +// +// //get the version we're keeping, it's first in our list +// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); +// +// //mock up unique version output +// final UniqueValueEntryMock uniqueValueEntryMock = +// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); +// +// +// EntityVersionCleanupTask cleanupTask = +// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, +// version, false ); +// +// final MutationBatch newBatch = mock( MutationBatch.class ); +// +// +// // set up returning a mutator +// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); +// +// //return a new batch when it's called +// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); +// +// +// cleanupTask.call(); +// +// +// //get the second field, this should be deleted +// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); +// +// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); +// +// +// //verify delete was invoked +// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); +// +// //verify the delete was invoked +// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); +// +// // verify it was run +// verify( entityBatch ).execute(); +// +// +// //the latch was executed +// latch.await(); +// } +// +// +// @Test//(timeout=10000) +// public void multipleListenerMultipleVersions() throws Exception { +// +// final SerializationFig serializationFig = mock( SerializationFig.class ); +// +// when( serializationFig.getBufferSize() ).thenReturn( 10 ); +// +// +// //create a latch for the event listener, and add it to the list of events +// final int sizeToReturn = 10; +// +// final CountDownLatch latch = new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * 3 ); +// +// final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch ); +// final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch ); +// final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch ); +// +// final Set<EntityVersionDeleted> listeners = new HashSet<>(); +// +// listeners.add( listener1 ); +// listeners.add( listener2 ); +// listeners.add( listener3 ); +// +// +// +// +// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); +// +// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); +// +// final Keyspace keyspace = mock( Keyspace.class ); +// +// final MutationBatch entityBatch = mock( MutationBatch.class ); +// +// when( keyspace.prepareMutationBatch() ).thenReturn( +// mock( MutationBatch.class ) ) // don't care what happens to this one +// .thenReturn( entityBatch ); +// +// +// +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); +// +// final Id entityId = new SimpleId( "user" ); +// +// final List<UUID> versions = VersionGenerator.generateVersions( 2 ); +// +// +// // mock up a single log entry for our first test +// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); +// +// +// //get the version we're keeping, it's first in our list +// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); +// +// //mock up unique version output +// final UniqueValueEntryMock uniqueValueEntryMock = +// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); +// +// +// EntityVersionCleanupTask cleanupTask = +// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, +// version, false ); +// +// final MutationBatch newBatch = mock( MutationBatch.class ); +// +// +// // set up returning a mutator +// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); +// +// //return a new batch when it's called +// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); +// +// +// cleanupTask.call(); +// +// +// //get the second field, this should be deleted +// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); +// +// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); +// +// +// //verify delete was invoked +// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); +// +// //verify the delete was invoked +// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); +// +// // verify it was run +// verify( entityBatch ).execute(); +// +// +// //the latch was executed +// latch.await(); +// +// //we deleted the version +// //verify we deleted everything +// //verify delete was invoked +// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); +// +// //verify the delete was invoked +// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); +// +// // verify it was run +// verify( entityBatch ).execute(); +// +// //the latch was executed +// latch.await(); +// } +// +// +// /** +// * Tests what happens when our listeners are VERY slow +// */ +//// @Ignore( "Test is a work in progress" ) +// @Test( timeout = 10000 ) +// public void multipleListenerMultipleVersionsNoThreadsToRun() +// throws ExecutionException, InterruptedException, ConnectionException { +// +// +// final SerializationFig serializationFig = mock( SerializationFig.class ); +// +// when( serializationFig.getBufferSize() ).thenReturn( 10 ); +// +// +// //create a latch for the event listener, and add it to the list of events +// final int sizeToReturn = 10; +// +// +// final int listenerCount = 5; +// +// final CountDownLatch latch = +// new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * listenerCount ); +// final Semaphore waitSemaphore = new Semaphore( 0 ); +// +// +// final SlowListener listener1 = new SlowListener( latch, waitSemaphore ); +// final SlowListener listener2 = new SlowListener( latch, waitSemaphore ); +// final SlowListener listener3 = new SlowListener( latch, waitSemaphore ); +// final SlowListener listener4 = new SlowListener( latch, waitSemaphore ); +// final SlowListener listener5 = new SlowListener( latch, waitSemaphore ); +// +// final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>(); +// +// listeners.add( listener1 ); +// listeners.add( listener2 ); +// listeners.add( listener3 ); +// listeners.add( listener4 ); +// listeners.add( listener5 ); +// +// +// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); +// +// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); +// +// final Keyspace keyspace = mock( Keyspace.class ); +// +// final MutationBatch entityBatch = mock( MutationBatch.class ); +// +// when( keyspace.prepareMutationBatch() ).thenReturn( +// mock( MutationBatch.class ) ) // don't care what happens to this one +// .thenReturn( entityBatch ); +// +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); +// +// final Id entityId = new SimpleId( "user" ); +// +// +// final List<UUID> versions = VersionGenerator.generateVersions( 2 ); +// +// // mock up a single log entry for our first test +// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); +// +// +// //get the version we're keeping, it's first in our list +// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); +// +// +// //mock up unique version output +// final UniqueValueEntryMock uniqueValueEntryMock = +// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); +// +// +// EntityVersionCleanupTask cleanupTask = +// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, +// version, false); +// +// final MutationBatch newBatch = mock( MutationBatch.class ); +// +// +// // set up returning a mutator +// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); +// +// //return a new batch when it's called +// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); +// +// +// //start the task +// ListenableFuture<Void> future = taskExecutor.submit( cleanupTask ); +// +// /** +// * While we're not done, release latches every 200 ms +// */ +// while ( !future.isDone() ) { +// Thread.sleep( 200 ); +// waitSemaphore.release( listenerCount ); +// } +// +// //wait for the task +// future.get(); +// +// +// //get the second field, this should be deleted +// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); +// +// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); +// +// +// //verify delete was invoked +// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); +// +// //verify the delete was invoked +// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); +// +// // verify it was run +// verify( entityBatch ).execute(); +// +// +// //the latch was executed +// latch.await(); +// +// //we deleted the version +// //verify we deleted everything +// //verify delete was invoked +// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); +// +// //verify the delete was invoked +// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); +// +// // verify it was run +// verify( entityBatch ).execute(); +// +// //the latch was executed +// latch.await(); +// +// +// //the latch was executed +// latch.await(); +// } +// +// +// /** +// * Tests that our task will run in the caller if there's no threads, ensures that the task runs +// */ +// @Test( timeout = 10000 ) +// public void singleListenerSingleVersionRejected() +// throws ExecutionException, InterruptedException, ConnectionException { +// +// +// +// //create a latch for the event listener, and add it to the list of events +// final int sizeToReturn = 1; +// +// final CountDownLatch latch = new CountDownLatch( sizeToReturn ); +// +// final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch ); +// +// final Set<EntityVersionDeleted> listeners = new HashSet<>(); +// +// listeners.add( eventListener ); +// +// +// final SerializationFig serializationFig = mock( SerializationFig.class ); +// +// when( serializationFig.getBufferSize() ).thenReturn( 10 ); +// +// final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); +// +// final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); +// +// final Keyspace keyspace = mock( Keyspace.class ); +// +// final MutationBatch entityBatch = mock( MutationBatch.class ); +// +// when( keyspace.prepareMutationBatch() ).thenReturn( +// mock( MutationBatch.class ) ) // don't care what happens to this one +// .thenReturn( entityBatch ); +// +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); +// +// final Id entityId = new SimpleId( "user" ); +// +// +// final List<UUID> versions = VersionGenerator.generateVersions( 2 ); +// +// +// // mock up a single log entry for our first test +// final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); +// +// +// //get the version we're keeping, it's first in our list +// final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); +// +// //mock up unique version output +// final UniqueValueEntryMock uniqueValueEntryMock = +// UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); +// +// +// EntityVersionCleanupTask cleanupTask = +// new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, +// version, false ); +// +// final MutationBatch newBatch = mock( MutationBatch.class ); +// +// +// // set up returning a mutator +// when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); +// +// //return a new batch when it's called +// when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); +// +// +// cleanupTask.rejected(); +// +// +// //get the second field, this should be deleted +// final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); +// +// final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); +// +// +// //verify delete was invoked +// verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); +// +// //verify the delete was invoked +// verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); +// +// // verify it was run +// verify( entityBatch ).execute(); +// +// +// //the latch was executed +// latch.await(); +// } +// +// +// private static class EntityVersionDeletedTest implements EntityVersionDeleted { +// final CountDownLatch invocationLatch; +// +// +// private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) { +// this.invocationLatch = invocationLatch; +// } +// +// +// @Override +// public void versionDeleted( final ApplicationScope scope, final Id entityId, +// final List<MvccLogEntry> entityVersion ) { +// invocationLatch.countDown(); +// } +// } +// +// +// private static class SlowListener extends EntityVersionDeletedTest { +// final Semaphore blockLatch; +// +// +// private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) { +// super( invocationLatch ); +// this.blockLatch = blockLatch; +// } +// +// +// @Override +// public void versionDeleted( final ApplicationScope scope, final Id entityId, +// final List<MvccLogEntry> entityVersion ) { +// +// //wait for unblock to happen before counting down invocation latches +// try { +// blockLatch.acquire(); +// } +// catch ( InterruptedException e ) { +// throw new RuntimeException( e ); +// } +// super.versionDeleted( scope, entityId, entityVersion ); +// } +// } +}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java new file mode 100644 index 0000000..8ffba71 --- /dev/null +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.mvcc.stage.delete; + + +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; + +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class VersionCompactTest { + +// private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 ); +// +// @AfterClass +// public static void shutdown() { +// taskExecutor.shutdown(); +// } +// +// +// @Test(timeout=10000) +// public void noListener() +// throws ExecutionException, InterruptedException, ConnectionException { +// +// // create a latch for the event listener, and add it to the list of events +// +// final Set<EntityVersionCreated> listeners = mock( Set.class ); +// +// when ( listeners.size()).thenReturn( 0 ); +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl(applicationId); +// +// final Id entityId = new SimpleId( "user" ); +// final Entity entity = new Entity( entityId ); +// +// // start the task +// +// EntityVersionCreatedTask entityVersionCreatedTask = +// new EntityVersionCreatedTask( appScope, listeners, entity); +// +// try { +// entityVersionCreatedTask.call(); +// }catch(Exception e){ +// Assert.fail( e.getMessage() ); +// } +// +// +// // wait for the task +// // future.get(); +// +// //mocked listener makes sure that the task is called +// verify( listeners ).size(); +// +// } +// @Test(timeout=10000) +// public void oneListener() +// throws ExecutionException, InterruptedException, ConnectionException { +// +// // create a latch for the event listener, and add it to the list of events +// +// final int sizeToReturn = 1; +// +// final CountDownLatch latch = new CountDownLatch( sizeToReturn ); +// +// final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch); +// +// final Set<EntityVersionCreated> listeners = mock( Set.class ); +// final Iterator<EntityVersionCreated> helper = mock(Iterator.class); +// +// when ( listeners.size()).thenReturn( 1 ); +// when ( listeners.iterator()).thenReturn( helper ); +// when ( helper.next() ).thenReturn( eventListener ); +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl(applicationId); +// +// final Id entityId = new SimpleId( "user" ); +// final Entity entity = new Entity( entityId ); +// +// // start the task +// +// EntityVersionCreatedTask entityVersionCreatedTask = +// new EntityVersionCreatedTask( appScope, listeners, entity); +// +// try { +// entityVersionCreatedTask.call(); +// }catch(Exception e){ +// +// Assert.fail(e.getMessage()); +// } +// //mocked listener makes sure that the task is called +// verify( listeners ).size(); +// verify( listeners ).iterator(); +// verify( helper ).next(); +// +// } +// +// @Test(timeout=10000) +// public void multipleListener() +// throws ExecutionException, InterruptedException, ConnectionException { +// +// final int sizeToReturn = 3; +// +// final Set<EntityVersionCreated> listeners = mock( Set.class ); +// final Iterator<EntityVersionCreated> helper = mock(Iterator.class); +// +// when ( listeners.size()).thenReturn( 3 ); +// when ( listeners.iterator()).thenReturn( helper ); +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl(applicationId); +// +// final Id entityId = new SimpleId( "user" ); +// final Entity entity = new Entity( entityId ); +// +// // start the task +// +// EntityVersionCreatedTask entityVersionCreatedTask = +// new EntityVersionCreatedTask( appScope, listeners, entity); +// +// final CountDownLatch latch = new CountDownLatch( sizeToReturn ); +// +// final EntityVersionCreatedTest listener1 = new EntityVersionCreatedTest(latch); +// final EntityVersionCreatedTest listener2 = new EntityVersionCreatedTest(latch); +// final EntityVersionCreatedTest listener3 = new EntityVersionCreatedTest(latch); +// +// when ( helper.next() ).thenReturn( listener1,listener2,listener3); +// +// try { +// entityVersionCreatedTask.call(); +// }catch(Exception e){ +// ; +// } +// //ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask ); +// +// //wait for the task +// //intentionally fails due to difficulty mocking observable +// +// //mocked listener makes sure that the task is called +// verify( listeners ).size(); +// //verifies that the observable made listener iterate. +// verify( listeners ).iterator(); +// } +// +// @Test(timeout=10000) +// public void oneListenerRejected() +// throws ExecutionException, InterruptedException, ConnectionException { +// +// // create a latch for the event listener, and add it to the list of events +// +// final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 ); +// +// final int sizeToReturn = 1; +// +// final CountDownLatch latch = new CountDownLatch( sizeToReturn ); +// +// final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch); +// +// final Set<EntityVersionCreated> listeners = mock( Set.class ); +// final Iterator<EntityVersionCreated> helper = mock(Iterator.class); +// +// when ( listeners.size()).thenReturn( 1 ); +// when ( listeners.iterator()).thenReturn( helper ); +// when ( helper.next() ).thenReturn( eventListener ); +// +// final Id applicationId = new SimpleId( "application" ); +// +// final ApplicationScope appScope = new ApplicationScopeImpl(applicationId); +// +// final Id entityId = new SimpleId( "user" ); +// final Entity entity = new Entity( entityId ); +// +// // start the task +// +// EntityVersionCreatedTask entityVersionCreatedTask = +// new EntityVersionCreatedTask( appScope, listeners, entity); +// +// entityVersionCreatedTask.rejected(); +// +// //mocked listener makes sure that the task is called +// verify( listeners ).size(); +// verify( listeners ).iterator(); +// verify( helper ).next(); +// +// } +// +// private static class EntityVersionCreatedTest implements EntityVersionCreated { +// final CountDownLatch invocationLatch; +// +// private EntityVersionCreatedTest( final CountDownLatch invocationLatch) { +// this.invocationLatch = invocationLatch; +// } +// +// @Override +// public void versionCreated( final ApplicationScope scope, final Entity entity ) { +// invocationLatch.countDown(); +// } +// } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java deleted file mode 100644 index 720e602..0000000 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java +++ /dev/null @@ -1,132 +0,0 @@ -package org.apache.usergrid.persistence.collection.serialization.impl; - - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.UUID; - -import org.junit.Test; - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.collection.util.LogEntryMock; -import org.apache.usergrid.persistence.collection.util.VersionGenerator; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; - -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -/** - * Tests iterator paging - */ -public class LogEntryIteratorTest { - - - @Test - public void empty() throws ConnectionException { - - final MvccLogEntrySerializationStrategy logEntrySerializationStrategy = - mock( MvccLogEntrySerializationStrategy.class ); - - final ApplicationScope scope = - new ApplicationScopeImpl( new SimpleId( "application" )); - - final Id entityId = new SimpleId( "entity" ); - - final int pageSize = 100; - - - //set the start version, it should be discarded - UUID start = UUIDGenerator.newTimeUUID(); - - when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) ) - .thenReturn( new ArrayList<MvccLogEntry>() ); - - - //now iterate we should get everything - LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize ); - - - assertFalse( itr.hasNext() ); - } - - - @Test - public void partialLastPage() throws ConnectionException { - - - final int pageSize = 10; - final int totalPages = 3; - final int lastPageSize = pageSize / 2; - - //have one half page - - pageElements( pageSize, totalPages, lastPageSize ); - } - - - @Test - public void emptyLastPage() throws ConnectionException { - - - final int pageSize = 10; - final int totalPages = 3; - final int lastPageSize = 0; - - //have one half page - - pageElements( pageSize, totalPages, lastPageSize ); - } - - - public void pageElements( final int pageSize, final int totalPages, final int lastPageSize ) - throws ConnectionException { - - final MvccLogEntrySerializationStrategy logEntrySerializationStrategy = - mock( MvccLogEntrySerializationStrategy.class ); - - final ApplicationScope scope = - new ApplicationScopeImpl( new SimpleId( "application" ) ); - - final Id entityId = new SimpleId( "entity" ); - - - //have one half page - final int toGenerate = pageSize * totalPages + lastPageSize; - - - final LogEntryMock mockResults = - LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, VersionGenerator.generateVersions( toGenerate ) ); - - Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator(); - - //this element should be skipped - UUID start = expectedEntries.next().getVersion(); - - //now iterate we should get everything - LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize ); - - - while ( expectedEntries.hasNext() && itr.hasNext() ) { - final MvccLogEntry expected = expectedEntries.next(); - - final MvccLogEntry returned = itr.next(); - - assertEquals( expected, returned ); - } - - - assertFalse( itr.hasNext() ); - assertFalse( expectedEntries.hasNext() ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java new file mode 100644 index 0000000..c82e1bf --- /dev/null +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java @@ -0,0 +1,134 @@ +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.UUID; + +import org.junit.Test; + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.util.LogEntryMock; +import org.apache.usergrid.persistence.collection.util.VersionGenerator; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; + +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Tests iterator paging + */ +public class MinMaxLogEntryIteratorTest { + + + @Test + public void empty() throws ConnectionException { + + final MvccLogEntrySerializationStrategy logEntrySerializationStrategy = + mock( MvccLogEntrySerializationStrategy.class ); + + final ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "application" )); + + final Id entityId = new SimpleId( "entity" ); + + final int pageSize = 100; + + + //set the start version, it should be discarded + UUID start = UUIDGenerator.newTimeUUID(); + + when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) ) + .thenReturn( new ArrayList<MvccLogEntry>() ); + + + //now iterate we should get everything + MinMaxLogEntryIterator + itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize ); + + + assertFalse( itr.hasNext() ); + } + + + @Test + public void partialLastPage() throws ConnectionException { + + + final int pageSize = 10; + final int totalPages = 3; + final int lastPageSize = pageSize / 2; + + //have one half page + + pageElements( pageSize, totalPages, lastPageSize ); + } + + + @Test + public void emptyLastPage() throws ConnectionException { + + + final int pageSize = 10; + final int totalPages = 3; + final int lastPageSize = 0; + + //have one half page + + pageElements( pageSize, totalPages, lastPageSize ); + } + + + public void pageElements( final int pageSize, final int totalPages, final int lastPageSize ) + throws ConnectionException { + + final MvccLogEntrySerializationStrategy logEntrySerializationStrategy = + mock( MvccLogEntrySerializationStrategy.class ); + + final ApplicationScope scope = + new ApplicationScopeImpl( new SimpleId( "application" ) ); + + final Id entityId = new SimpleId( "entity" ); + + + //have one half page + final int toGenerate = pageSize * totalPages + lastPageSize; + + + final LogEntryMock mockResults = + LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, VersionGenerator.generateVersions( toGenerate ) ); + + Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator(); + + //this element should be skipped + UUID start = expectedEntries.next().getVersion(); + + //now iterate we should get everything + MinMaxLogEntryIterator + itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize ); + + + while ( expectedEntries.hasNext() && itr.hasNext() ) { + final MvccLogEntry expected = expectedEntries.next(); + + final MvccLogEntry returned = itr.next(); + + assertEquals( expected, returned ); + } + + + assertFalse( itr.hasNext() ); + assertFalse( expectedEntries.hasNext() ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java index 107b2a0..673903c 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection.serialization.impl; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -43,6 +44,7 @@ import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import com.fasterxml.uuid.UUIDComparator; import com.google.inject.Inject; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; @@ -65,15 +67,15 @@ public abstract class MvccLogEntrySerializationStrategyImplTest { private MvccLogEntrySerializationStrategy logEntryStrategy; + @Before - public void wireLogEntryStrategy(){ + public void wireLogEntryStrategy() { logEntryStrategy = getLogEntryStrategy(); } /** * Get the log entry strategy from - * @return */ protected abstract MvccLogEntrySerializationStrategy getLogEntryStrategy(); @@ -181,6 +183,85 @@ public abstract class MvccLogEntrySerializationStrategyImplTest { } + @Test + public void getReversedEntries() throws ConnectionException { + + final Id applicationId = new SimpleId( "application" ); + + ApplicationScope context = new ApplicationScopeImpl( applicationId ); + + + final Id id = new SimpleId( "test" ); + + int count = 10; + + final UUID[] versions = new UUID[count]; + final Stage COMPLETE = Stage.COMPLETE; + final MvccLogEntry[] entries = new MvccLogEntry[count]; + + + for ( int i = 0; i < count; i++ ) { + versions[i] = UUIDGenerator.newTimeUUID(); + + entries[i] = new MvccLogEntryImpl( id, versions[i], COMPLETE, MvccLogEntry.State.COMPLETE ); + logEntryStrategy.write( context, entries[i] ).execute(); + + //Read it back + + MvccLogEntry returned = + logEntryStrategy.load( context, Collections.singleton( id ), versions[i] ).getMaxVersion( id ); + + assertNotNull( "Returned value should not be null", returned ); + + assertEquals( "Returned should equal the saved", entries[i], returned ); + } + + + final UUID[] assertVersions = Arrays.copyOf( versions, versions.length ); + + Arrays.sort( assertVersions, ( v1, v2 ) -> UUIDComparator.staticCompare( v1, v2 ) * -1 ); + + //now do a range scan from the end + + final int half = count/2; + + final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half); + + assertEquals( half, results.size() ); + + for ( int i = 0; i < count / 2; i++ ) { + final MvccLogEntry saved = entries[i]; + final MvccLogEntry returned = results.get( i ); + + assertEquals( "Entry was not equal to the saved value", saved, returned ); + } + + + //now get the next batch + final List<MvccLogEntry> results2 = + logEntryStrategy.loadReversed( context, id, versions[half], count ); + + assertEquals( half, results2.size()); + + for ( int i = 0; i < half; i++ ) { + final MvccLogEntry saved = entries[half + i]; + final MvccLogEntry returned = results2.get( i ); + + assertEquals( "Entry was not equal to the saved value", saved, returned ); + } + + + //now delete them all and ensure we get no results back + for ( int i = 0; i < count; i++ ) { + logEntryStrategy.delete( context, id, versions[i] ).execute(); + } + + final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length ); + + assertEquals( "All log entries were deleted", 0, results3.size() ); + } + + @Test( expected = NullPointerException.class ) public void writeParamsNoContext() throws ConnectionException { logEntryStrategy.write( null, mock( MvccLogEntry.class ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java new file mode 100644 index 0000000..c653458 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.core.executor; + + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * A task executor that allows you to submit tasks + */ +public class TaskExecutorFactory { + + + /** + * Create a task executor + * @param schedulerName + * @param maxThreadCount + * @param maxQueueSize + * @return + */ + public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount, + final int maxQueueSize ) { + + + final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize ); + + + final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount ); + + + return threadPool; + } + + + /** + * Create a thread pool that will reject work if our audit tasks become overwhelmed + */ + private static final class MaxSizeThreadPool extends ThreadPoolExecutor { + + public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) { + super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ) ); + } + } + + + /** + * Thread factory that will name and count threads for easier debugging + */ + private static final class CountingThreadFactory implements ThreadFactory { + + private final AtomicLong threadCounter = new AtomicLong(); + private final String poolName; + + + private CountingThreadFactory( final String poolName ) {this.poolName = poolName;} + + + @Override + public Thread newThread( final Runnable r ) { + final long newValue = threadCounter.incrementAndGet(); + + final String threadName = poolName + "-" + newValue; + + Thread t = new Thread( r, threadName ); + + //set it to be a daemon thread so it doesn't block shutdown + t.setDaemon( true ); + + return t; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java index d6cc5e8..f28e190 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java @@ -20,8 +20,6 @@ package org.apache.usergrid.persistence.core.rx; -import org.apache.usergrid.persistence.core.task.Task; - import rx.Scheduler; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java deleted file mode 100644 index 9007167..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.core.task; - - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; - -import javax.annotation.Nullable; - -import org.slf4j.Logger; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - - -/** - * Implementation of the task executor with a unique name and size - */ -public class NamedTaskExecutorImpl implements TaskExecutor { - - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class ); - - private final ListeningExecutorService executorService; - - private final String name; - private final int poolSize; - - - /** - * @param name The name of this instance of the task executor - * @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once. - * @param queueLength The length of tasks to keep in the queue - */ - public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) { - Preconditions.checkNotNull( name ); - Preconditions.checkArgument( name.length() > 0, "name must have a length" ); - Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" ); - Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" ); - - this.name = name; - this.poolSize = poolSize; - - // The user has chosen to disable asynchronous execution, - // to create an executor service that will reject all requests - if ( poolSize == 0 ) { - executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService()); - } - - //queue executions as normal - else { - final BlockingQueue<Runnable> queue = queueLength > 0 - ? new ArrayBlockingQueue<Runnable>(queueLength) : new SynchronousQueue<Runnable>(); - - executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) ); - } - } - - - @Override - public <V> ListenableFuture<V> submit( final Task<V> task ) { - - final ListenableFuture<V> future; - - try { - future = executorService.submit( task ); - - // Log our success or failures for debugging purposes - Futures.addCallback( future, new TaskFutureCallBack<V>( task ) ); - } - catch ( RejectedExecutionException ree ) { - return Futures.immediateFuture( task.rejected()); - } - - return future; - } - - - @Override - public void shutdown() { - this.executorService.shutdownNow(); - } - - - /** - * Callback for when the task succeeds or fails. - */ - private static final class TaskFutureCallBack<V> implements FutureCallback<V> { - - private final Task<V> task; - - - private TaskFutureCallBack( Task<V> task ) { - this.task = task; - } - - - @Override - public void onSuccess( @Nullable final V result ) { - LOG.debug( "Successfully completed task ", task ); - } - - - @Override - public void onFailure( final Throwable t ) { - LOG.error( "Unable to execute task. Exception is ", t ); - - task.exceptionThrown( t ); - } - } - - - /** - * Create a thread pool that will reject work if our audit tasks become overwhelmed - */ - private final class MaxSizeThreadPool extends ThreadPoolExecutor { - - public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) { - - super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ), - new RejectedHandler() ); - } - } - - - /** - * Thread factory that will name and count threads for easier debugging - */ - private final class CountingThreadFactory implements ThreadFactory { - - private final AtomicLong threadCounter = new AtomicLong(); - - - @Override - public Thread newThread( final Runnable r ) { - final long newValue = threadCounter.incrementAndGet(); - - Thread t = new Thread( r, name + "-" + newValue ); - - t.setDaemon( true ); - - return t; - } - } - - - /** - * The handler that will handle rejected executions and signal the interface - */ - private final class RejectedHandler implements RejectedExecutionHandler { - - - @Override - public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) { - LOG.warn( "{} task queue full, rejecting task {}", name, r ); - - throw new RejectedExecutionException( "Unable to run task, queue full" ); - } - - } - - - /** - * Executor implementation that simply rejects all incoming tasks - */ - private static final class RejectingExecutorService implements ExecutorService{ - - @Override - public void shutdown() { - - } - - - @Override - public List<Runnable> shutdownNow() { - return Collections.EMPTY_LIST; - } - - - @Override - public boolean isShutdown() { - return false; - } - - - @Override - public boolean isTerminated() { - return false; - } - - - @Override - public boolean awaitTermination( final long timeout, final TimeUnit unit ) - throws InterruptedException { - return false; - } - - - @Override - public <T> Future<T> submit( final Callable<T> task ) { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> Future<T> submit( final Runnable task, final T result ) { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public Future<?> submit( final Runnable task ) { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks ) - throws InterruptedException { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks, - final long timeout, final TimeUnit unit ) throws InterruptedException { - - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> T invokeAny( final Collection<? extends Callable<T>> tasks ) - throws InterruptedException, ExecutionException { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout, - final TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - - - @Override - public void execute( final Runnable command ) { - throw new RejectedExecutionException("No Asynchronous tasks allowed"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java deleted file mode 100644 index 5582161..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.core.task; - - -import java.util.concurrent.Callable; - - -/** - * The task to execute - */ -public interface Task<V> extends Callable<V> { - - - /** - * Invoked when this task throws an uncaught exception. - * @param throwable - */ - void exceptionThrown(final Throwable throwable); - - /** - * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. - * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the - * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the - * request and process later (lazy repair for instance ) do so. - * - */ - V rejected(); - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java deleted file mode 100644 index 5728d2e..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.core.task; - - -import com.google.common.util.concurrent.ListenableFuture; - - -/** - * An interface for execution of tasks - */ -public interface TaskExecutor { - - /** - * Submit the task asynchronously - * @param task - */ - public <V> ListenableFuture<V> submit( Task<V> task ); - - /** - * Stop the task executor without waiting for scheduled threads to run - */ - public void shutdown(); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java deleted file mode 100644 index 4f95918..0000000 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.core.task; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - - -/** - * Tests for the namedtask execution impl - */ -public class NamedTaskExecutorImplTest { - - - @Test - public void jobSuccess() throws InterruptedException { - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 0 ); - final CountDownLatch runLatch = new CountDownLatch( 1 ); - - final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; - - executor.submit( task ); - - - runLatch.await( 1000, TimeUnit.MILLISECONDS ); - - assertEquals( 0l, exceptionLatch.getCount() ); - - assertEquals( 0l, rejectedLatch.getCount() ); - } - - - @Test - public void exceptionThrown() throws InterruptedException { - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 1 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 0 ); - final CountDownLatch runLatch = new CountDownLatch( 1 ); - - final RuntimeException re = new RuntimeException( "throwing exception" ); - - final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) { - @Override - public Void call() throws Exception { - super.call(); - throw re; - } - }; - - executor.submit( task ); - - - runLatch.await( 1000, TimeUnit.MILLISECONDS ); - exceptionLatch.await( 1000, TimeUnit.MILLISECONDS ); - - assertSame( re, task.exceptions.get( 0 ) ); - - - assertEquals( 0l, rejectedLatch.getCount() ); - } - - - @Test - public void noCapacity() throws InterruptedException { - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 0 ); - final CountDownLatch runLatch = new CountDownLatch( 1 ); - - - final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) { - @Override - public Void call() throws Exception { - super.call(); - - //park this thread so it takes up a task and the next is rejected - final Object mutex = new Object(); - - synchronized ( mutex ) { - mutex.wait(); - } - - return null; - } - }; - - executor.submit( task ); - - - runLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //now submit the second task - - - final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 ); - final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch secondRunLatch = new CountDownLatch( 1 ); - - - final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; - - executor.submit( testTask ); - - - secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //if we get here we've been rejected, just double check we didn't run - - assertEquals( 1l, secondRunLatch.getCount() ); - assertEquals( 0l, secondExceptionLatch.getCount() ); - } - - - @Test - public void noCapacityWithQueue() throws InterruptedException { - - final int threadPoolSize = 1; - final int queueSize = 10; - - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 0 ); - final CountDownLatch runLatch = new CountDownLatch( 1 ); - - int iterations = threadPoolSize + queueSize; - - for(int i = 0; i < iterations; i ++) { - - final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) { - @Override - public Void call() throws Exception { - super.call(); - - //park this thread so it takes up a task and the next is rejected - final Object mutex = new Object(); - - synchronized ( mutex ) { - mutex.wait(); - } - - return null; - } - }; - executor.submit( task ); - } - - - - runLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //now submit the second task - - - final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 ); - final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch secondRunLatch = new CountDownLatch( 1 ); - - - final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; - - executor.submit( testTask ); - - - secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //if we get here we've been rejected, just double check we didn't run - - assertEquals( 1l, secondRunLatch.getCount() ); - assertEquals( 0l, secondExceptionLatch.getCount() ); - } - - - @Test - public void rejectingTaskExecutor() throws InterruptedException { - - final int threadPoolSize = 0; - final int queueSize = 0; - - final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize ); - - final CountDownLatch exceptionLatch = new CountDownLatch( 0 ); - final CountDownLatch rejectedLatch = new CountDownLatch( 1 ); - final CountDownLatch runLatch = new CountDownLatch( 0 ); - - - //now submit the second task - - - - final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {}; - - executor.submit( testTask ); - - - //should be immediately rejected - rejectedLatch.await( 1000, TimeUnit.MILLISECONDS ); - - //if we get here we've been rejected, just double check we didn't run - - assertEquals( 0l, exceptionLatch.getCount() ); - assertEquals( 0l, runLatch.getCount() ); - } - - - private static abstract class TestTask<V> implements Task<V> { - - private final List<Throwable> exceptions; - private final CountDownLatch exceptionLatch; - private final CountDownLatch rejectedLatch; - private final CountDownLatch runLatch; - - - private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch, - final CountDownLatch runLatch ) { - this.exceptionLatch = exceptionLatch; - this.rejectedLatch = rejectedLatch; - this.runLatch = runLatch; - - this.exceptions = new ArrayList<>(); - } - - - - @Override - public void exceptionThrown( final Throwable throwable ) { - this.exceptions.add( throwable ); - exceptionLatch.countDown(); - } - - - @Override - public V rejected() { - rejectedLatch.countDown(); - return null; - } - - - @Override - public V call() throws Exception { - runLatch.countDown(); - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java index 4c38c13..987a36c 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java @@ -64,23 +64,23 @@ public interface GraphManager extends CPManager { /** - * @param edge The edge to delete + * @param edge Mark the edge as deleted in the graph * * * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version */ - Observable<Edge> deleteEdge( Edge edge ); + Observable<Edge> markEdge( Edge edge ); /** * - * Remove the node from the graph. + * Mark the node as removed from the graph. * * @param node The node to remove * @param timestamp The timestamp to apply the delete operation. Any edges connected to this node with a timestmap * <= the specified time will be removed from the graph * @return */ - Observable<Id> deleteNode(Id node, long timestamp); + Observable<Id> markNode( Id node, long timestamp ); /** * Get all versions of this edge where versions <= max version http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java index eb49711..d73f767 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java @@ -36,7 +36,7 @@ public interface GraphManagerFactory * * @param collectionScope The context to use when creating the graph manager */ - public GraphManager createEdgeManager( ApplicationScope collectionScope ); + GraphManager createEdgeManager( ApplicationScope collectionScope ); void invalidate(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java index 114440f..38f62b5 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java @@ -59,6 +59,12 @@ public interface SearchByEdge { long getMaxTimestamp(); /** + * Return true if we should filter edges marked for deletion + * @return + */ + boolean filterMarked(); + + /** * The optional start parameter. All edges emitted with be > the specified start edge. * This is useful for paging. Simply use the last value returned in the previous call in the start parameter * @return http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java index 5e47ae0..f213b00 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java @@ -65,6 +65,12 @@ public interface SearchByEdgeType { */ Order getOrder(); + /** + * Return true to filter marked edges from the results + * @return + */ + boolean filterMarked(); + /** * Options for ordering. By default, we want to perform descending for common use cases and read speed. This is our our data http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java index 6cdaef0..4b628d1 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java @@ -28,8 +28,6 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin; import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship; import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet; import org.apache.usergrid.persistence.core.migration.schema.Migration; -import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener; @@ -196,15 +194,6 @@ public abstract class GraphModule extends AbstractModule { } - @Inject - @Singleton - @Provides - @GraphTaskExecutor - public TaskExecutor graphTaskExecutor( final GraphFig graphFig ) { - return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), - graphFig.getShardAuditWorkerQueueSize() ); - } - /**
