http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java deleted file mode 100644 index e5c2896..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java +++ /dev/null @@ -1,114 +0,0 @@ -package org.apache.usergrid.persistence.collection.serialization.impl; - - -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.UUID; - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Preconditions; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - - -/** - * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion - */ -public class LogEntryIterator implements Iterator<MvccLogEntry> { - - - private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; - private final ApplicationScope scope; - private final Id entityId; - private final int pageSize; - - - private Iterator<MvccLogEntry> elementItr; - private UUID nextStart; - - - /** - * @param logEntrySerializationStrategy The serialization strategy to get the log entries - * @param scope The scope of the entity - * @param entityId The id of the entity - * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version - * < max - * @param pageSize The fetch size to get when querying the serialization strategy - */ - public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final ApplicationScope scope, final Id entityId, final UUID maxVersion, - final int pageSize ) { - - Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" ); - - this.logEntrySerializationStrategy = logEntrySerializationStrategy; - this.scope = scope; - this.entityId = entityId; - this.nextStart = maxVersion; - this.pageSize = pageSize; - } - - - @Override - public boolean hasNext() { - if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) { - try { - advance(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to query cassandra", e ); - } - } - - return elementItr.hasNext(); - } - - - @Override - public MvccLogEntry next() { - if ( !hasNext() ) { - throw new NoSuchElementException( "No more elements exist" ); - } - - return elementItr.next(); - } - - - @Override - public void remove() { - throw new UnsupportedOperationException( "Remove is unsupported" ); - } - - - /** - * Advance our iterator - */ - public void advance() throws ConnectionException { - - final int requestedSize = pageSize + 1; - - //loop through even entry that's < this one and remove it - List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize ); - - //we always remove the first version if it's equal since it's returned - if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { - results.remove( 0 ); - } - - - //we have results, set our next start - if ( results.size() == pageSize ) { - nextStart = results.get( results.size() - 1 ).getVersion(); - } - //nothing left to do - else { - nextStart = null; - } - - elementItr = results.iterator(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java new file mode 100644 index 0000000..072e0ea --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; + +import rx.Observable; +import rx.Subscriber; + + +/** + * An observable that emits log entries from MIN to MAX + */ +public class LogEntryObservable implements Observable.OnSubscribe<MvccLogEntry>{ + + private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; + + + public LogEntryObservable( final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) { + this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; + } + + + @Override + public void call( final Subscriber<? super MvccLogEntry> subscriber ) { + + subscriber.onStart(); + + while(!subscriber.isUnsubscribed()){ + + + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java new file mode 100644 index 0000000..a8e15a7 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java @@ -0,0 +1,114 @@ +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.UUID; + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Preconditions; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + + +/** + * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion + */ +public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> { + + + private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; + private final ApplicationScope scope; + private final Id entityId; + private final int pageSize; + + + private Iterator<MvccLogEntry> elementItr; + private UUID nextStart; + + + /** + * @param logEntrySerializationStrategy The serialization strategy to get the log entries + * @param scope The scope of the entity + * @param entityId The id of the entity + * @param maxVersion The max version of the entity. Iterator will iterate from min to min starting with the version + * < max + * @param pageSize The fetch size to get when querying the serialization strategy + */ + public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final ApplicationScope scope, final Id entityId, final UUID maxVersion, + final int pageSize ) { + + Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" ); + + this.logEntrySerializationStrategy = logEntrySerializationStrategy; + this.scope = scope; + this.entityId = entityId; + this.nextStart = maxVersion; + this.pageSize = pageSize; + } + + + @Override + public boolean hasNext() { + if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) { + try { + advance(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to query cassandra", e ); + } + } + + return elementItr.hasNext(); + } + + + @Override + public MvccLogEntry next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more elements exist" ); + } + + return elementItr.next(); + } + + + @Override + public void remove() { + throw new UnsupportedOperationException( "Remove is unsupported" ); + } + + + /** + * Advance our iterator + */ + public void advance() throws ConnectionException { + + final int requestedSize = pageSize + 1; + + //loop through even entry that's < this one and remove it + List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize ); + + //we always remove the first version if it's equal since it's returned + if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { + results.remove( 0 ); + } + + + //we have results, set our next start + if ( results.size() == pageSize ) { + nextStart = results.get( results.size() - 1 ).getVersion(); + } + //nothing left to do + else { + nextStart = null; + } + + elementItr = results.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java index 81c0248..fdef3c3 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java @@ -111,6 +111,20 @@ public class MvccLogEntrySerializationProxyImpl implements MvccLogEntrySerializa @Override + public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId, + final UUID minVersion, final int maxSize ) { + + final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip(); + + if ( migration.needsMigration() ) { + return migration.from.loadReversed( applicationScope, entityId, minVersion, maxSize ); + } + + return migration.to.loadReversed( applicationScope, entityId, minVersion, maxSize ); + } + + + @Override public MutationBatch delete( final ApplicationScope applicationScope, final Id entityId, final UUID version ) { final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java index 76e9dba..73804e4 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java @@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.collection.serialization.impl; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,11 +30,6 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.IntegerType; -import org.apache.cassandra.db.marshal.ReversedType; -import org.apache.cassandra.db.marshal.UUIDType; - import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException; @@ -43,10 +37,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage; import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils; -import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily; -import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; @@ -126,12 +117,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo //didnt put the max in the error message, I don't want to take the string construction hit every time Preconditions.checkArgument( entityIds.size() <= fig.getMaxLoadSize(), - "requested size cannot be over configured maximum" ); + "requested size cannot be over configured maximum" ); final Id applicationId = collectionScope.getApplication(); - final Id ownerId = applicationId; - final List<ScopedRowKey<K>> rowKeys = new ArrayList<>( entityIds.size() ); @@ -155,7 +144,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo } catch ( ConnectionException e ) { throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", - e ); + e ); } @@ -181,7 +170,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo final StageStatus stageStatus = column.getValue( SER ); final MvccLogEntry logEntry = - new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state ); + new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state ); versionResults.addEntry( logEntry ); @@ -208,13 +197,41 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo final ScopedRowKey<K> rowKey = createKey( applicationId, entityId ); + columns = + keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey ).withColumnRange( version, null, false, maxSize ) + .execute().getResult(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to load log entries", e ); + } + + return parseResults( columns, entityId ); + } + + + @Override + public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId, + final UUID minVersion, final int maxSize ) { + ColumnList<UUID> columns; + try { + + final Id applicationId = applicationScope.getApplication(); + + final ScopedRowKey<K> rowKey = createKey( applicationId, entityId ); + + columns = keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey ) - .withColumnRange( version, null, false, maxSize ).execute().getResult(); + .withColumnRange( minVersion, null, true, maxSize ).execute().getResult(); } catch ( ConnectionException e ) { throw new RuntimeException( "Unable to load log entries", e ); } + return parseResults( columns, entityId ); + } + + + private List<MvccLogEntry> parseResults( final ColumnList<UUID> columns, final Id entityId ) { List<MvccLogEntry> results = new ArrayList<MvccLogEntry>( columns.size() ); @@ -245,8 +262,6 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo } - - /** * Simple callback to perform puts and deletes with a common row setup code */ @@ -281,12 +296,14 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo return batch; } + protected abstract MultiTennantColumnFamily<ScopedRowKey<K>, UUID> getColumnFamily(); - protected abstract ScopedRowKey<K> createKey(final Id applicationId, final Id entityId); + protected abstract ScopedRowKey<K> createKey( final Id applicationId, final Id entityId ); + + protected abstract Id getEntityIdFromKey( final ScopedRowKey<K> key ); - protected abstract Id getEntityIdFromKey(final ScopedRowKey<K> key); /** * Internal stage shard @@ -319,7 +336,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo */ private static class StatusCache { private Map<Integer, MvccLogEntry.State> values = - new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length ); + new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length ); private StatusCache() { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java index 3168817..e825bbc 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java @@ -31,16 +31,12 @@ import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask; -import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl; -import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyV2Impl; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; -import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyV2Impl; import org.apache.usergrid.persistence.core.migration.data.DataMigration; import org.apache.usergrid.persistence.core.migration.data.DataMigrationException; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; @@ -76,7 +72,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> private final Keyspace keyspace; private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions; - private final EntityVersionTaskFactory entityVersionCleanupFactory; private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; @@ -85,13 +80,11 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> @Inject public MvccEntityDataMigrationImpl( final Keyspace keyspace, final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions, - final EntityVersionTaskFactory entityVersionCleanupFactory, final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) { this.keyspace = keyspace; this.allVersions = allVersions; - this.entityVersionCleanupFactory = entityVersionCleanupFactory; this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; @@ -162,7 +155,8 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> atomicLong.addAndGet( entities.size() ); - List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() ); + final List<Id> toSaveIds = new ArrayList<>( entities.size() ); + for ( EntityToSaveMessage message : entities ) { final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity ); @@ -187,6 +181,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> final UUID version = message.entity.getVersion(); + + toSaveIds.add( entityId ); + // re-write the unique // values // but this @@ -215,7 +212,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> * Migrate the log entry to the new format */ for(final MvccLogEntry entry: logEntries){ - final MutationBatch mb = mvccLogEntrySerializationStrategy.write( message.scope, entry ); totalBatch.mergeShallow( mb ); @@ -223,24 +219,18 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> - //schedule our cleanup task to clean up all the data - final EntityVersionCleanupTask task = entityVersionCleanupFactory - .getCleanupTask( message.scope, message.entity.getId(), version, false ); - entityVersionCleanupTasks.add( task ); } executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong ); //now run our cleanup task - for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) { - try { - entityVersionCleanupTask.call(); - } - catch ( Exception e ) { - LOGGER.error( "Unable to run cleanup task", e ); - } + for ( Id updatedId : toSaveIds ) { + + + + } } ).subscribeOn( Schedulers.io() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java deleted file mode 100644 index 8c26c5b..0000000 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java +++ /dev/null @@ -1,715 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; - -import org.junit.AfterClass; -import org.junit.Test; - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.UniqueValue; -import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; -import org.apache.usergrid.persistence.collection.util.LogEntryMock; -import org.apache.usergrid.persistence.collection.util.UniqueValueEntryMock; -import org.apache.usergrid.persistence.collection.util.VersionGenerator; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl; -import org.apache.usergrid.persistence.core.task.TaskExecutor; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; - -import com.google.common.util.concurrent.ListenableFuture; -import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -/** - * Cleanup task tests - */ -public class EntityVersionCleanupTaskTest { - - private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 ); - - - @AfterClass - public static void shutdown() { - taskExecutor.shutdown(); - } - - - @Test( timeout = 10000 ) - public void noListenerOneVersion() throws Exception { - - - final SerializationFig serializationFig = mock( SerializationFig.class ); - - when( serializationFig.getBufferSize() ).thenReturn( 10 ); - - final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); - - - final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); - - final Keyspace keyspace = mock( Keyspace.class ); - - final MutationBatch entityBatch = mock( MutationBatch.class ); - - when( keyspace.prepareMutationBatch() ).thenReturn( - mock( MutationBatch.class ) ) // don't care what happens to this one - .thenReturn( entityBatch ); - - // intentionally no events - final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>(); - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); - - final Id entityId = new SimpleId( "user" ); - - final List<UUID> versions = VersionGenerator.generateVersions( 2 ); - - // mock up a single log entry for our first test - final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); - - - //get the version we're keeping, it's first in our list - final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); - - //mock up unique version output - final UniqueValueEntryMock uniqueValueEntryMock = - UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); - - - EntityVersionCleanupTask cleanupTask = - new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, - version, false ); - - final MutationBatch newBatch = mock( MutationBatch.class ); - - - // set up returning a mutator - when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); - - //return a new batch when it's called - when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); - - - cleanupTask.call(); - - - //get the second field, this should be deleted - final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); - - final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); - - - //verify delete was invoked - verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); - - //verify the delete was invoked - verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); - - // verify it was run - verify( entityBatch ).execute(); - } - - - /** - * Tests the cleanup task on the first version created - */ - @Test( timeout = 10000 ) - public void noListenerNoVersions() throws Exception { - - - final SerializationFig serializationFig = mock( SerializationFig.class ); - - when( serializationFig.getBufferSize() ).thenReturn( 10 ); - - final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); - - - final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); - - final Keyspace keyspace = mock( Keyspace.class ); - - final MutationBatch entityBatch = mock( MutationBatch.class ); - - when( keyspace.prepareMutationBatch() ).thenReturn( - mock( MutationBatch.class ) ) // don't care what happens to this one - .thenReturn( entityBatch ); - - // intentionally no events - final Set<EntityVersionDeleted> listeners = new HashSet<>(); - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); - - final Id entityId = new SimpleId( "user" ); - - - final List<UUID> versions = VersionGenerator.generateVersions( 1 ); - - // mock up a single log entry, with no other entries - final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); - - - //get the version we're keeping, it's first in our list - final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); - - //mock up unique version output - final UniqueValueEntryMock uniqueValueEntryMock = - UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); - - - EntityVersionCleanupTask cleanupTask = - new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, version, false ); - - final MutationBatch newBatch = mock( MutationBatch.class ); - - - // set up returning a mutator - when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); - - //return a new batch when it's called - when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); - - - cleanupTask.call(); - - - //verify delete was never invoked - verify( uvss, never() ).delete( any( ApplicationScope.class ), any( UniqueValue.class ) ); - - //verify the delete was never invoked - verify( less, never() ).delete( any( ApplicationScope.class ), any( Id.class ), any( UUID.class ) ); - } - - - @Test( timeout = 10000 ) - public void singleListenerSingleVersion() throws Exception { - - - //create a latch for the event listener, and add it to the list of events - final int sizeToReturn = 1; - - final CountDownLatch latch = new CountDownLatch( sizeToReturn ); - - final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch ); - - final Set<EntityVersionDeleted> listeners = new HashSet<>(); - - listeners.add( eventListener ); - - - final SerializationFig serializationFig = mock( SerializationFig.class ); - - when( serializationFig.getBufferSize() ).thenReturn( 10 ); - - final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); - - final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); - - final Keyspace keyspace = mock( Keyspace.class ); - - final MutationBatch entityBatch = mock( MutationBatch.class ); - - when( keyspace.prepareMutationBatch() ).thenReturn( - mock( MutationBatch.class ) ) // don't care what happens to this one - .thenReturn( entityBatch ); - - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); - - final Id entityId = new SimpleId( "user" ); - - - final List<UUID> versions = VersionGenerator.generateVersions( 2 ); - - - // mock up a single log entry for our first test - final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); - - - //get the version we're keeping, it's first in our list - final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); - - //mock up unique version output - final UniqueValueEntryMock uniqueValueEntryMock = - UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); - - - EntityVersionCleanupTask cleanupTask = - new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, - version, false ); - - final MutationBatch newBatch = mock( MutationBatch.class ); - - - // set up returning a mutator - when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); - - //return a new batch when it's called - when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); - - - cleanupTask.call(); - - - //get the second field, this should be deleted - final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); - - final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); - - - //verify delete was invoked - verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); - - //verify the delete was invoked - verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); - - // verify it was run - verify( entityBatch ).execute(); - - - //the latch was executed - latch.await(); - } - - - @Test//(timeout=10000) - public void multipleListenerMultipleVersions() throws Exception { - - final SerializationFig serializationFig = mock( SerializationFig.class ); - - when( serializationFig.getBufferSize() ).thenReturn( 10 ); - - - //create a latch for the event listener, and add it to the list of events - final int sizeToReturn = 10; - - final CountDownLatch latch = new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * 3 ); - - final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch ); - final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch ); - final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch ); - - final Set<EntityVersionDeleted> listeners = new HashSet<>(); - - listeners.add( listener1 ); - listeners.add( listener2 ); - listeners.add( listener3 ); - - - - - final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); - - final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); - - final Keyspace keyspace = mock( Keyspace.class ); - - final MutationBatch entityBatch = mock( MutationBatch.class ); - - when( keyspace.prepareMutationBatch() ).thenReturn( - mock( MutationBatch.class ) ) // don't care what happens to this one - .thenReturn( entityBatch ); - - - - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); - - final Id entityId = new SimpleId( "user" ); - - final List<UUID> versions = VersionGenerator.generateVersions( 2 ); - - - // mock up a single log entry for our first test - final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); - - - //get the version we're keeping, it's first in our list - final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); - - //mock up unique version output - final UniqueValueEntryMock uniqueValueEntryMock = - UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); - - - EntityVersionCleanupTask cleanupTask = - new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, - version, false ); - - final MutationBatch newBatch = mock( MutationBatch.class ); - - - // set up returning a mutator - when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); - - //return a new batch when it's called - when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); - - - cleanupTask.call(); - - - //get the second field, this should be deleted - final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); - - final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); - - - //verify delete was invoked - verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); - - //verify the delete was invoked - verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); - - // verify it was run - verify( entityBatch ).execute(); - - - //the latch was executed - latch.await(); - - //we deleted the version - //verify we deleted everything - //verify delete was invoked - verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); - - //verify the delete was invoked - verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); - - // verify it was run - verify( entityBatch ).execute(); - - //the latch was executed - latch.await(); - } - - - /** - * Tests what happens when our listeners are VERY slow - */ -// @Ignore( "Test is a work in progress" ) - @Test( timeout = 10000 ) - public void multipleListenerMultipleVersionsNoThreadsToRun() - throws ExecutionException, InterruptedException, ConnectionException { - - - final SerializationFig serializationFig = mock( SerializationFig.class ); - - when( serializationFig.getBufferSize() ).thenReturn( 10 ); - - - //create a latch for the event listener, and add it to the list of events - final int sizeToReturn = 10; - - - final int listenerCount = 5; - - final CountDownLatch latch = - new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * listenerCount ); - final Semaphore waitSemaphore = new Semaphore( 0 ); - - - final SlowListener listener1 = new SlowListener( latch, waitSemaphore ); - final SlowListener listener2 = new SlowListener( latch, waitSemaphore ); - final SlowListener listener3 = new SlowListener( latch, waitSemaphore ); - final SlowListener listener4 = new SlowListener( latch, waitSemaphore ); - final SlowListener listener5 = new SlowListener( latch, waitSemaphore ); - - final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>(); - - listeners.add( listener1 ); - listeners.add( listener2 ); - listeners.add( listener3 ); - listeners.add( listener4 ); - listeners.add( listener5 ); - - - final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); - - final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); - - final Keyspace keyspace = mock( Keyspace.class ); - - final MutationBatch entityBatch = mock( MutationBatch.class ); - - when( keyspace.prepareMutationBatch() ).thenReturn( - mock( MutationBatch.class ) ) // don't care what happens to this one - .thenReturn( entityBatch ); - - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); - - final Id entityId = new SimpleId( "user" ); - - - final List<UUID> versions = VersionGenerator.generateVersions( 2 ); - - // mock up a single log entry for our first test - final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); - - - //get the version we're keeping, it's first in our list - final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); - - - //mock up unique version output - final UniqueValueEntryMock uniqueValueEntryMock = - UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); - - - EntityVersionCleanupTask cleanupTask = - new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, - version, false); - - final MutationBatch newBatch = mock( MutationBatch.class ); - - - // set up returning a mutator - when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); - - //return a new batch when it's called - when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); - - - //start the task - ListenableFuture<Void> future = taskExecutor.submit( cleanupTask ); - - /** - * While we're not done, release latches every 200 ms - */ - while ( !future.isDone() ) { - Thread.sleep( 200 ); - waitSemaphore.release( listenerCount ); - } - - //wait for the task - future.get(); - - - //get the second field, this should be deleted - final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); - - final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); - - - //verify delete was invoked - verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); - - //verify the delete was invoked - verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); - - // verify it was run - verify( entityBatch ).execute(); - - - //the latch was executed - latch.await(); - - //we deleted the version - //verify we deleted everything - //verify delete was invoked - verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); - - //verify the delete was invoked - verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); - - // verify it was run - verify( entityBatch ).execute(); - - //the latch was executed - latch.await(); - - - //the latch was executed - latch.await(); - } - - - /** - * Tests that our task will run in the caller if there's no threads, ensures that the task runs - */ - @Test( timeout = 10000 ) - public void singleListenerSingleVersionRejected() - throws ExecutionException, InterruptedException, ConnectionException { - - - - //create a latch for the event listener, and add it to the list of events - final int sizeToReturn = 1; - - final CountDownLatch latch = new CountDownLatch( sizeToReturn ); - - final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch ); - - final Set<EntityVersionDeleted> listeners = new HashSet<>(); - - listeners.add( eventListener ); - - - final SerializationFig serializationFig = mock( SerializationFig.class ); - - when( serializationFig.getBufferSize() ).thenReturn( 10 ); - - final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class ); - - final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class ); - - final Keyspace keyspace = mock( Keyspace.class ); - - final MutationBatch entityBatch = mock( MutationBatch.class ); - - when( keyspace.prepareMutationBatch() ).thenReturn( - mock( MutationBatch.class ) ) // don't care what happens to this one - .thenReturn( entityBatch ); - - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl( applicationId ); - - final Id entityId = new SimpleId( "user" ); - - - final List<UUID> versions = VersionGenerator.generateVersions( 2 ); - - - // mock up a single log entry for our first test - final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions ); - - - //get the version we're keeping, it's first in our list - final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion(); - - //mock up unique version output - final UniqueValueEntryMock uniqueValueEntryMock = - UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions ); - - - EntityVersionCleanupTask cleanupTask = - new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, - version, false ); - - final MutationBatch newBatch = mock( MutationBatch.class ); - - - // set up returning a mutator - when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch ); - - //return a new batch when it's called - when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch ); - - - cleanupTask.rejected(); - - - //get the second field, this should be deleted - final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 ); - - final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 ); - - - //verify delete was invoked - verify( uvss ).delete( same( appScope ), same( oldUniqueField ) ); - - //verify the delete was invoked - verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) ); - - // verify it was run - verify( entityBatch ).execute(); - - - //the latch was executed - latch.await(); - } - - - private static class EntityVersionDeletedTest implements EntityVersionDeleted { - final CountDownLatch invocationLatch; - - - private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) { - this.invocationLatch = invocationLatch; - } - - - @Override - public void versionDeleted( final ApplicationScope scope, final Id entityId, - final List<MvccLogEntry> entityVersion ) { - invocationLatch.countDown(); - } - } - - - private static class SlowListener extends EntityVersionDeletedTest { - final Semaphore blockLatch; - - - private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) { - super( invocationLatch ); - this.blockLatch = blockLatch; - } - - - @Override - public void versionDeleted( final ApplicationScope scope, final Id entityId, - final List<MvccLogEntry> entityVersion ) { - - //wait for unblock to happen before counting down invocation latches - try { - blockLatch.acquire(); - } - catch ( InterruptedException e ) { - throw new RuntimeException( e ); - } - super.versionDeleted( scope, entityId, entityVersion ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java deleted file mode 100644 index e993fad..0000000 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; - -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Test; - -import org.apache.usergrid.persistence.collection.event.EntityVersionCreated; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl; -import org.apache.usergrid.persistence.core.task.TaskExecutor; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; - -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -/** - * Created task tests. - */ -public class EntityVersionCreatedTaskTest { - - private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 ); - - @AfterClass - public static void shutdown() { - taskExecutor.shutdown(); - } - - - @Test(timeout=10000) - public void noListener() - throws ExecutionException, InterruptedException, ConnectionException { - - // create a latch for the event listener, and add it to the list of events - - final Set<EntityVersionCreated> listeners = mock( Set.class ); - - when ( listeners.size()).thenReturn( 0 ); - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl(applicationId); - - final Id entityId = new SimpleId( "user" ); - final Entity entity = new Entity( entityId ); - - // start the task - - EntityVersionCreatedTask entityVersionCreatedTask = - new EntityVersionCreatedTask( appScope, listeners, entity); - - try { - entityVersionCreatedTask.call(); - }catch(Exception e){ - Assert.fail(e.getMessage()); - } - - - // wait for the task - // future.get(); - - //mocked listener makes sure that the task is called - verify( listeners ).size(); - - } - @Test(timeout=10000) - public void oneListener() - throws ExecutionException, InterruptedException, ConnectionException { - - // create a latch for the event listener, and add it to the list of events - - final int sizeToReturn = 1; - - final CountDownLatch latch = new CountDownLatch( sizeToReturn ); - - final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch); - - final Set<EntityVersionCreated> listeners = mock( Set.class ); - final Iterator<EntityVersionCreated> helper = mock(Iterator.class); - - when ( listeners.size()).thenReturn( 1 ); - when ( listeners.iterator()).thenReturn( helper ); - when ( helper.next() ).thenReturn( eventListener ); - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl(applicationId); - - final Id entityId = new SimpleId( "user" ); - final Entity entity = new Entity( entityId ); - - // start the task - - EntityVersionCreatedTask entityVersionCreatedTask = - new EntityVersionCreatedTask( appScope, listeners, entity); - - try { - entityVersionCreatedTask.call(); - }catch(Exception e){ - - Assert.fail(e.getMessage()); - } - //mocked listener makes sure that the task is called - verify( listeners ).size(); - verify( listeners ).iterator(); - verify( helper ).next(); - - } - - @Test(timeout=10000) - public void multipleListener() - throws ExecutionException, InterruptedException, ConnectionException { - - final int sizeToReturn = 3; - - final Set<EntityVersionCreated> listeners = mock( Set.class ); - final Iterator<EntityVersionCreated> helper = mock(Iterator.class); - - when ( listeners.size()).thenReturn( 3 ); - when ( listeners.iterator()).thenReturn( helper ); - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl(applicationId); - - final Id entityId = new SimpleId( "user" ); - final Entity entity = new Entity( entityId ); - - // start the task - - EntityVersionCreatedTask entityVersionCreatedTask = - new EntityVersionCreatedTask( appScope, listeners, entity); - - final CountDownLatch latch = new CountDownLatch( sizeToReturn ); - - final EntityVersionCreatedTest listener1 = new EntityVersionCreatedTest(latch); - final EntityVersionCreatedTest listener2 = new EntityVersionCreatedTest(latch); - final EntityVersionCreatedTest listener3 = new EntityVersionCreatedTest(latch); - - when ( helper.next() ).thenReturn( listener1,listener2,listener3); - - try { - entityVersionCreatedTask.call(); - }catch(Exception e){ - ; - } - //ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask ); - - //wait for the task - //intentionally fails due to difficulty mocking observable - - //mocked listener makes sure that the task is called - verify( listeners ).size(); - //verifies that the observable made listener iterate. - verify( listeners ).iterator(); - } - - @Test(timeout=10000) - public void oneListenerRejected() - throws ExecutionException, InterruptedException, ConnectionException { - - // create a latch for the event listener, and add it to the list of events - - final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 ); - - final int sizeToReturn = 1; - - final CountDownLatch latch = new CountDownLatch( sizeToReturn ); - - final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch); - - final Set<EntityVersionCreated> listeners = mock( Set.class ); - final Iterator<EntityVersionCreated> helper = mock(Iterator.class); - - when ( listeners.size()).thenReturn( 1 ); - when ( listeners.iterator()).thenReturn( helper ); - when ( helper.next() ).thenReturn( eventListener ); - - final Id applicationId = new SimpleId( "application" ); - - final ApplicationScope appScope = new ApplicationScopeImpl(applicationId); - - final Id entityId = new SimpleId( "user" ); - final Entity entity = new Entity( entityId ); - - // start the task - - EntityVersionCreatedTask entityVersionCreatedTask = - new EntityVersionCreatedTask( appScope, listeners, entity); - - entityVersionCreatedTask.rejected(); - - //mocked listener makes sure that the task is called - verify( listeners ).size(); - verify( listeners ).iterator(); - verify( helper ).next(); - - } - - private static class EntityVersionCreatedTest implements EntityVersionCreated { - final CountDownLatch invocationLatch; - - private EntityVersionCreatedTest( final CountDownLatch invocationLatch) { - this.invocationLatch = invocationLatch; - } - - @Override - public void versionCreated( final ApplicationScope scope, final Entity entity ) { - invocationLatch.countDown(); - } - } -}
