http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java new file mode 100644 index 0000000..0034f03 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.mvcc.stage.delete; + + +import java.util.Collections; +import java.util.Iterator; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; +import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.core.rx.ObservableIterator; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.codahale.metrics.Timer; +import com.fasterxml.uuid.UUIDComparator; +import com.google.inject.Inject; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import rx.Observable; + + +/** + * Runs on an entity that as just be mark committed, and removes all unique values <= this entity + */ +public class UniqueCleanup + implements Observable.Transformer<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> { + + + private static final Logger logger = LoggerFactory.getLogger( UniqueCleanup.class ); + private final Timer uniqueCleanupTimer; + + + private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; + private final Keyspace keyspace; + + private final SerializationFig serializationFig; + + + @Inject + public UniqueCleanup( final SerializationFig serializationFig, + final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, + final Keyspace keyspace, final MetricsFactory metricsFactory ) { + + this.serializationFig = serializationFig; + this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; + this.keyspace = keyspace; + this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup" ); + } + + + @Override + public Observable<CollectionIoEvent<MvccEntity>> call( + final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable ) { + + final Observable<CollectionIoEvent<MvccEntity>> outputObservable = + collectionIoEventObservable.doOnNext( mvccEntityCollectionIoEvent -> { + + final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId(); + final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection(); + final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion(); + + //TODO Refactor this logic into a a class that can be invoked from anywhere + //iterate all unique values + final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup = + Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { + @Override + protected Iterator<UniqueValue> getIterator() { + return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId ); + } + } ) + + //skip versions > the specified version + .skipWhile( uniqueValue -> { + + final UUID uniqueValueVersion = uniqueValue.getEntityVersion(); + + return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0; + } ) + + //buffer our buffer size, then roll them all up in a single batch mutation + .buffer( serializationFig.getBufferSize() ) + + //roll them up + .doOnNext( uniqueValues -> { + final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); + + + for ( UniqueValue value : uniqueValues ) { + uniqueCleanupBatch + .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) ); + } + + try { + uniqueCleanupBatch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to execute batch mutation", e ); + } + } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent ); + } ); + + return ObservableTimer.time( outputObservable, uniqueCleanupTimer ); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java new file mode 100644 index 0000000..424ec86 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.mvcc.stage.delete; + + +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; +import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.codahale.metrics.Timer; +import com.google.inject.Inject; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import rx.Observable; + + +/** + * Compact all versions on the input observable by removing them from the log, and from the + * versions + */ +public class VersionCompact + implements Observable.Transformer<CollectionIoEvent<MvccLogEntry>, CollectionIoEvent<MvccLogEntry>> { + + private static final Logger logger = LoggerFactory.getLogger( VersionCompact.class ); + + private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; + private final MvccEntitySerializationStrategy mvccEntitySerializationStrategy; + private final SerializationFig serializationFig; + private final Keyspace keyspace; + private final Timer compactTimer; + + + @Inject + public VersionCompact( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final SerializationFig serializationFig, final Keyspace keyspace, + final MetricsFactory metricsFactory, + final MvccEntitySerializationStrategy mvccEntitySerializationStrategy ) { + this.logEntrySerializationStrategy = logEntrySerializationStrategy; + this.serializationFig = serializationFig; + this.keyspace = keyspace; + this.mvccEntitySerializationStrategy = mvccEntitySerializationStrategy; + this.compactTimer = metricsFactory.getTimer( VersionCompact.class, "compact" ); + } + + + @Override + public Observable<CollectionIoEvent<MvccLogEntry>> call( + final Observable<CollectionIoEvent<MvccLogEntry>> collectionIoEventObservable ) { + + + final Observable<CollectionIoEvent<MvccLogEntry>> entryBuffer = + collectionIoEventObservable.buffer( serializationFig.getBufferSize() ).flatMap( + buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(), + ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> { + + final ApplicationScope scope = mvccLogEntryCollectionIoEvent.getEntityCollection(); + final MvccLogEntry mvccLogEntry = mvccLogEntryCollectionIoEvent.getEvent(); + final Id entityId = mvccLogEntry.getEntityId(); + final UUID version = mvccLogEntry.getVersion(); + + if ( logger.isDebugEnabled() ) { + logger.debug( + "Deleting log entry and version data for entity id {} and version {} in app scope {}", + new Object[] { entityId, version, scope } ); + } + + + //delete from our log + mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) ); + + //merge our entity delete in + mutationBatch + .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) ); + + + + } ) ) + //delete from the entities + .doOnNext( mutationBatch -> { + try { + mutationBatch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to perform batch mutation" ); + } + } ).flatMap( batches -> Observable.from( buffer ) ) ); + + + return ObservableTimer.time( entryBuffer, compactTimer ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java index 92669a7..5e249ae 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java @@ -71,6 +71,20 @@ public interface MvccLogEntrySerializationStrategy extends Migration, VersionedD */ List<MvccLogEntry> load( ApplicationScope applicationScope, Id entityId, UUID version, int maxSize ); + + + /** + * Load a list, from lowest to highest of the stage with versions <= version up to maxSize elements + * + * @param applicationScope The applicationScope to load the entity from + * @param entityId The entity id to load + * @param minVersion The min version to seek from. Null is allowed + * @param maxSize The maximum size to return. If you receive this size, there may be more versions to load. + * + * @return A list of entities up to max size ordered from max(UUID)=> min(UUID) + */ + List<MvccLogEntry> loadReversed( ApplicationScope applicationScope, Id entityId, UUID minVersion, int maxSize ); + /** * MarkCommit the stage from the applicationScope with the given entityId and version * http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java index 381a24e..6591781 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java @@ -24,42 +24,18 @@ public interface SerializationFig extends GuicyFig { @Default("5") int getTimeout(); - /** - * Number of history items to return for delete. - * - * @return Timeout in seconds. - */ - @Key("collection.delete.history.size") - @Default("100") - int getHistorySize(); /** * Number of items to buffer. * - * @return Timeout in seconds. + * @return Number of items to buffer in memory */ - @Key("collection.buffer.size") - @Default("10") + @Key("buffer.size") + @Default("100") int getBufferSize(); /** - * The size of threads to have in the task pool - */ - @Key( "collection.task.pool.threadsize" ) - @Default( "20" ) - int getTaskPoolThreadSize(); - - - - /** - * The size of threads to have in the task pool - */ - @Key( "collection.task.pool.queuesize" ) - @Default( "20" ) - int getTaskPoolQueueSize(); - - /** * The maximum amount of entities we can load in a single request * TODO, change this and move it into a common setting that both query and collection share */ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java deleted file mode 100644 index e5c2896..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java +++ /dev/null @@ -1,114 +0,0 @@ -package org.apache.usergrid.persistence.collection.serialization.impl; - - -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.UUID; - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Preconditions; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - - -/** - * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion - */ -public class LogEntryIterator implements Iterator<MvccLogEntry> { - - - private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; - private final ApplicationScope scope; - private final Id entityId; - private final int pageSize; - - - private Iterator<MvccLogEntry> elementItr; - private UUID nextStart; - - - /** - * @param logEntrySerializationStrategy The serialization strategy to get the log entries - * @param scope The scope of the entity - * @param entityId The id of the entity - * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version - * < max - * @param pageSize The fetch size to get when querying the serialization strategy - */ - public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final ApplicationScope scope, final Id entityId, final UUID maxVersion, - final int pageSize ) { - - Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" ); - - this.logEntrySerializationStrategy = logEntrySerializationStrategy; - this.scope = scope; - this.entityId = entityId; - this.nextStart = maxVersion; - this.pageSize = pageSize; - } - - - @Override - public boolean hasNext() { - if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) { - try { - advance(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to query cassandra", e ); - } - } - - return elementItr.hasNext(); - } - - - @Override - public MvccLogEntry next() { - if ( !hasNext() ) { - throw new NoSuchElementException( "No more elements exist" ); - } - - return elementItr.next(); - } - - - @Override - public void remove() { - throw new UnsupportedOperationException( "Remove is unsupported" ); - } - - - /** - * Advance our iterator - */ - public void advance() throws ConnectionException { - - final int requestedSize = pageSize + 1; - - //loop through even entry that's < this one and remove it - List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize ); - - //we always remove the first version if it's equal since it's returned - if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { - results.remove( 0 ); - } - - - //we have results, set our next start - if ( results.size() == pageSize ) { - nextStart = results.get( results.size() - 1 ).getVersion(); - } - //nothing left to do - else { - nextStart = null; - } - - elementItr = results.iterator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java new file mode 100644 index 0000000..eae8c06 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java @@ -0,0 +1,121 @@ +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.UUID; + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Preconditions; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + + +/** + * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion + */ +public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> { + + + private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; + private final ApplicationScope scope; + private final Id entityId; + private final int pageSize; + + + private Iterator<MvccLogEntry> elementItr; + private UUID nextStart; + + + /** + * @param logEntrySerializationStrategy The serialization strategy to get the log entries + * @param scope The scope of the entity + * @param entityId The id of the entity + * @param pageSize The fetch size to get when querying the serialization strategy + */ + public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final ApplicationScope scope, final Id entityId, final int pageSize ) { + + Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" ); + + this.logEntrySerializationStrategy = logEntrySerializationStrategy; + this.scope = scope; + this.entityId = entityId; + this.pageSize = pageSize; + } + + + @Override + public boolean hasNext() { + if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) { + try { + advance(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to query cassandra", e ); + } + } + + return elementItr.hasNext(); + } + + + @Override + public MvccLogEntry next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more elements exist" ); + } + + return elementItr.next(); + } + + + @Override + public void remove() { + throw new UnsupportedOperationException( "Remove is unsupported" ); + } + + + /** + * Advance our iterator + */ + public void advance() throws ConnectionException { + + final int requestedSize; + + if ( nextStart != null ) { + requestedSize = pageSize + 1; + } + else { + requestedSize = pageSize; + } + + //loop through even entry that's < this one and remove it + List<MvccLogEntry> results = logEntrySerializationStrategy.loadReversed( scope, entityId, nextStart, requestedSize ); + + //we always remove the first version if it's equal since it's returned + if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { + results.remove( 0 ); + } + + + + //we have results, set our next start. If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >= + if ( results.size() >= pageSize ) { + nextStart = results.get( results.size() - 1 ).getVersion(); + } + //nothing left to do + else { + nextStart = null; + } + + + + + elementItr = results.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java index 81c0248..fdef3c3 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java @@ -111,6 +111,20 @@ public class MvccLogEntrySerializationProxyImpl implements MvccLogEntrySerializa @Override + public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId, + final UUID minVersion, final int maxSize ) { + + final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip(); + + if ( migration.needsMigration() ) { + return migration.from.loadReversed( applicationScope, entityId, minVersion, maxSize ); + } + + return migration.to.loadReversed( applicationScope, entityId, minVersion, maxSize ); + } + + + @Override public MutationBatch delete( final ApplicationScope applicationScope, final Id entityId, final UUID version ) { final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java index 76e9dba..0c0d961 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java @@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.collection.serialization.impl; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,11 +30,6 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.IntegerType; -import org.apache.cassandra.db.marshal.ReversedType; -import org.apache.cassandra.db.marshal.UUIDType; - import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException; @@ -43,10 +37,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage; import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils; -import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily; -import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; @@ -98,19 +89,16 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo final UUID colName = entry.getVersion(); final StageStatus stageStatus = new StageStatus( stage, entry.getState() ); - return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), new RowOp() { - @Override - public void doOp( final ColumnListMutation<UUID> colMutation ) { - - //Write the stage with a timeout, it's set as transient - if ( stage.isTransient() ) { - colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() ); - return; - } + return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), colMutation -> { - //otherwise it's persistent, write it with no expiration - colMutation.putColumn( colName, stageStatus, SER, null ); + //Write the stage with a timeout, it's set as transient + if ( stage.isTransient() ) { + colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() ); + return; } + + //otherwise it's persistent, write it with no expiration + colMutation.putColumn( colName, stageStatus, SER, null ); } ); } @@ -126,12 +114,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo //didnt put the max in the error message, I don't want to take the string construction hit every time Preconditions.checkArgument( entityIds.size() <= fig.getMaxLoadSize(), - "requested size cannot be over configured maximum" ); + "requested size cannot be over configured maximum" ); final Id applicationId = collectionScope.getApplication(); - final Id ownerId = applicationId; - final List<ScopedRowKey<K>> rowKeys = new ArrayList<>( entityIds.size() ); @@ -155,7 +141,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo } catch ( ConnectionException e ) { throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", - e ); + e ); } @@ -181,7 +167,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo final StageStatus stageStatus = column.getValue( SER ); final MvccLogEntry logEntry = - new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state ); + new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state ); versionResults.addEntry( logEntry ); @@ -208,13 +194,41 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo final ScopedRowKey<K> rowKey = createKey( applicationId, entityId ); + columns = + keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey ).withColumnRange( version, null, false, maxSize ) + .execute().getResult(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to load log entries", e ); + } + + return parseResults( columns, entityId ); + } + + + @Override + public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId, + final UUID minVersion, final int maxSize ) { + ColumnList<UUID> columns; + try { + + final Id applicationId = applicationScope.getApplication(); + + final ScopedRowKey<K> rowKey = createKey( applicationId, entityId ); + + columns = keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey ) - .withColumnRange( version, null, false, maxSize ).execute().getResult(); + .withColumnRange( minVersion, null, true, maxSize ).execute().getResult(); } catch ( ConnectionException e ) { throw new RuntimeException( "Unable to load log entries", e ); } + return parseResults( columns, entityId ); + } + + + private List<MvccLogEntry> parseResults( final ColumnList<UUID> columns, final Id entityId ) { List<MvccLogEntry> results = new ArrayList<MvccLogEntry>( columns.size() ); @@ -236,17 +250,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo Preconditions.checkNotNull( entityId, "entityId is required" ); Preconditions.checkNotNull( version, "version context is required" ); - return doWrite( context, entityId, version, new RowOp() { - @Override - public void doOp( final ColumnListMutation<UUID> colMutation ) { - colMutation.deleteColumn( version ); - } - } ); + return doWrite( context, entityId, version, colMutation -> colMutation.deleteColumn( version ) ); } - - /** * Simple callback to perform puts and deletes with a common row setup code */ @@ -281,12 +288,14 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo return batch; } + protected abstract MultiTennantColumnFamily<ScopedRowKey<K>, UUID> getColumnFamily(); - protected abstract ScopedRowKey<K> createKey(final Id applicationId, final Id entityId); + protected abstract ScopedRowKey<K> createKey( final Id applicationId, final Id entityId ); + + protected abstract Id getEntityIdFromKey( final ScopedRowKey<K> key ); - protected abstract Id getEntityIdFromKey(final ScopedRowKey<K> key); /** * Internal stage shard @@ -319,7 +328,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo */ private static class StatusCache { private Map<Integer, MvccLogEntry.State> values = - new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length ); + new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length ); private StatusCache() { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java index 3168817..e825bbc 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java @@ -31,16 +31,12 @@ import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask; -import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl; -import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyV2Impl; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; -import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyV2Impl; import org.apache.usergrid.persistence.core.migration.data.DataMigration; import org.apache.usergrid.persistence.core.migration.data.DataMigrationException; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; @@ -76,7 +72,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> private final Keyspace keyspace; private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions; - private final EntityVersionTaskFactory entityVersionCleanupFactory; private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; @@ -85,13 +80,11 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> @Inject public MvccEntityDataMigrationImpl( final Keyspace keyspace, final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions, - final EntityVersionTaskFactory entityVersionCleanupFactory, final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) { this.keyspace = keyspace; this.allVersions = allVersions; - this.entityVersionCleanupFactory = entityVersionCleanupFactory; this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; @@ -162,7 +155,8 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> atomicLong.addAndGet( entities.size() ); - List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() ); + final List<Id> toSaveIds = new ArrayList<>( entities.size() ); + for ( EntityToSaveMessage message : entities ) { final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity ); @@ -187,6 +181,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> final UUID version = message.entity.getVersion(); + + toSaveIds.add( entityId ); + // re-write the unique // values // but this @@ -215,7 +212,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> * Migrate the log entry to the new format */ for(final MvccLogEntry entry: logEntries){ - final MutationBatch mb = mvccLogEntrySerializationStrategy.write( message.scope, entry ); totalBatch.mergeShallow( mb ); @@ -223,24 +219,18 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> - //schedule our cleanup task to clean up all the data - final EntityVersionCleanupTask task = entityVersionCleanupFactory - .getCleanupTask( message.scope, message.entity.getId(), version, false ); - entityVersionCleanupTasks.add( task ); } executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong ); //now run our cleanup task - for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) { - try { - entityVersionCleanupTask.call(); - } - catch ( Exception e ) { - LOGGER.error( "Unable to run cleanup task", e ); - } + for ( Id updatedId : toSaveIds ) { + + + + } } ).subscribeOn( Schedulers.io() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java index baceeb4..36faf62 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java @@ -91,8 +91,7 @@ public class EntityCollectionManagerIT { public void write() { - ApplicationScope context = - new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); @@ -113,8 +112,7 @@ public class EntityCollectionManagerIT { public void writeWithUniqueValues() { - ApplicationScope context = - new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -146,8 +144,7 @@ public class EntityCollectionManagerIT { public void writeAndLoad() { - ApplicationScope context = - new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); @@ -174,7 +171,7 @@ public class EntityCollectionManagerIT { public void writeLoadDelete() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -185,30 +182,27 @@ public class EntityCollectionManagerIT { assertNotNull( "Id was assigned", createReturned.getId() ); - - UUID version = createReturned.getVersion(); - Observable<Entity> loadObservable = manager.load( createReturned.getId() ); Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null ); assertEquals( "Same value", createReturned, loadReturned ); - manager.delete( createReturned.getId() ).toBlocking().last(); + manager.mark( createReturned.getId() ).toBlocking().last(); loadObservable = manager.load( createReturned.getId() ); //load may return null, use last or default loadReturned = loadObservable.toBlocking().lastOrDefault( null ); - assertNull("Entity was deleted", loadReturned); + assertNull( "Entity was deleted", loadReturned ); } @Test public void writeLoadUpdateLoad() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); newEntity.setField( new IntegerField( "counter", 1 ) ); @@ -217,36 +211,36 @@ public class EntityCollectionManagerIT { Observable<Entity> observable = manager.write( newEntity ); - Entity createReturned = observable.toBlocking().lastOrDefault(null); + Entity createReturned = observable.toBlocking().lastOrDefault( null ); assertNotNull( "Id was assigned", createReturned.getId() ); - Observable<Entity> loadObservable = manager.load(createReturned.getId()); + Observable<Entity> loadObservable = manager.load( createReturned.getId() ); Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null ); assertEquals( "Same value", createReturned, loadReturned ); - assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter")); + assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) ); //update the field to 2 createReturned.setField( new IntegerField( "counter", 2 ) ); //wait for the write to complete - manager.write( createReturned ).toBlocking().lastOrDefault(null); + manager.write( createReturned ).toBlocking().lastOrDefault( null ); loadObservable = manager.load( createReturned.getId() ); - loadReturned = loadObservable.toBlocking().lastOrDefault(null); + loadReturned = loadObservable.toBlocking().lastOrDefault( null ); assertEquals( "Same value", createReturned, loadReturned ); - assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter")); + assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) ); } @@ -254,30 +248,30 @@ public class EntityCollectionManagerIT { public void writeAndLoadScopeClosure() { - ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization")); + ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); - EntityCollectionManager manager = factory.createCollectionManager(collectionScope1); + EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 ); - Observable<Entity> observable = manager.write(newEntity); + Observable<Entity> observable = manager.write( newEntity ); Entity createReturned = observable.toBlocking().lastOrDefault( null ); - assertNotNull("Id was assigned", createReturned.getId()); - assertNotNull("Version was assigned", createReturned.getVersion()); + assertNotNull( "Id was assigned", createReturned.getId() ); + assertNotNull( "Version was assigned", createReturned.getVersion() ); Observable<Entity> loadObservable = manager.load( createReturned.getId() ); Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null ); - assertEquals("Same value", createReturned, loadReturned); + assertEquals( "Same value", createReturned, loadReturned ); - ApplicationScope collectionScope2 = new ApplicationScopeImpl(new SimpleId("organization")); + ApplicationScope collectionScope2 = new ApplicationScopeImpl( new SimpleId( "organization" ) ); //now make sure we can't load it from another scope, using the same org @@ -286,9 +280,7 @@ public class EntityCollectionManagerIT { Entity loaded = manager2.load( createReturned.getId() ).toBlocking().lastOrDefault( null ); - assertNull("CollectionScope works correctly", loaded); - - + assertNull( "CollectionScope works correctly", loaded ); } @@ -296,34 +288,32 @@ public class EntityCollectionManagerIT { public void writeAndGetField() { - ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization")); + ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); Field field = new StringField( "testField", "unique", true ); - newEntity.setField(field); + newEntity.setField( field ); EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 ); - Observable<Entity> observable = manager.write(newEntity); + Observable<Entity> observable = manager.write( newEntity ); Entity createReturned = observable.toBlocking().lastOrDefault( null ); assertNotNull( "Id was assigned", createReturned.getId() ); - assertNotNull("Version was assigned", createReturned.getVersion()); + assertNotNull( "Version was assigned", createReturned.getVersion() ); Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null ); assertNotNull( id ); assertEquals( newEntity.getId(), id ); Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true ); - id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null ); + id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null ); assertNull( id ); } - - @Test public void updateVersioning() { @@ -331,10 +321,10 @@ public class EntityCollectionManagerIT { Entity origEntity = new Entity( new SimpleId( "testUpdate" ) ); origEntity.setField( new StringField( "testField", "value" ) ); - ApplicationScope context = new ApplicationScopeImpl(new SimpleId("organization")); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); EntityCollectionManager manager = factory.createCollectionManager( context ); - Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault(null); + Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault( null ); // note its version UUID oldVersion = returned.getVersion(); @@ -345,7 +335,7 @@ public class EntityCollectionManagerIT { // partial update entity but we don't have version number Entity updateEntity = new Entity( origEntity.getId() ); updateEntity.setField( new StringField( "addedField", "other value" ) ); - manager.write( updateEntity ).toBlocking().lastOrDefault(null); + manager.write( updateEntity ).toBlocking().lastOrDefault( null ); // get entity now, it must have a new version returned = manager.load( origEntity.getId() ).toBlocking().lastOrDefault( null ); @@ -354,14 +344,14 @@ public class EntityCollectionManagerIT { assertNotNull( "A new version must be assigned", newVersion ); // new Version should be > old version - assertTrue(UUIDComparator.staticCompare(newVersion, oldVersion) > 0); + assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 ); } @Test public void writeMultiget() { - final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); final int multigetSize = serializationFig.getMaxLoadSize(); @@ -381,10 +371,10 @@ public class EntityCollectionManagerIT { final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null ); - assertNotNull(entitySet); + assertNotNull( entitySet ); assertEquals( multigetSize, entitySet.size() ); - assertFalse(entitySet.isEmpty()); + assertFalse( entitySet.isEmpty() ); /** * Validate every element exists @@ -405,7 +395,7 @@ public class EntityCollectionManagerIT { @Test public void writeMultigetRepair() { - final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); final int multigetSize = serializationFig.getMaxLoadSize(); @@ -444,7 +434,7 @@ public class EntityCollectionManagerIT { assertEquals( "Same entity returned", expected, returned.getEntity().get() ); - assertTrue((Boolean) returned.getEntity().get().getField("updated").getValue()); + assertTrue( ( Boolean ) returned.getEntity().get().getField( "updated" ).getValue() ); } } @@ -452,7 +442,7 @@ public class EntityCollectionManagerIT { @Test( expected = IllegalArgumentException.class ) public void readTooLarge() { - final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); final int multigetSize = serializationFig.getMaxLoadSize() + 1; @@ -474,7 +464,7 @@ public class EntityCollectionManagerIT { @Test public void testGetVersion() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -495,7 +485,7 @@ public class EntityCollectionManagerIT { VersionSet results = - manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last(); + manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last(); final MvccLogEntry version1Log = results.getMaxVersion( created1.getId() ); @@ -515,7 +505,7 @@ public class EntityCollectionManagerIT { @Test public void testVersionLogWrite() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -529,10 +519,10 @@ public class EntityCollectionManagerIT { final UUID v1Version = v1Created.getVersion(); - final VersionSet resultsV1 = manager.getLatestVersion(Arrays.asList(v1Created.getId())).toBlocking().last(); + final VersionSet resultsV1 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last(); - final MvccLogEntry version1Log = resultsV1.getMaxVersion(v1Created.getId()); + final MvccLogEntry version1Log = resultsV1.getMaxVersion( v1Created.getId() ); assertEquals( v1Created.getId(), version1Log.getEntityId() ); assertEquals( v1Version, version1Log.getVersion() ); assertEquals( MvccLogEntry.State.COMPLETE, version1Log.getState() ); @@ -543,7 +533,7 @@ public class EntityCollectionManagerIT { final UUID v2Version = v2Created.getVersion(); - assertTrue("Newer version in v2", UUIDComparator.staticCompare(v2Version, v1Version) > 0); + assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 ); final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last(); @@ -560,7 +550,7 @@ public class EntityCollectionManagerIT { @Test public void testVersionLogUpdate() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -608,7 +598,7 @@ public class EntityCollectionManagerIT { @Test public void healthTest() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -632,7 +622,7 @@ public class EntityCollectionManagerIT { final Entity entity = EntityHelper.generateEntity( setSize ); //now we have one massive, entity, save it and retrieve it. - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -652,20 +642,21 @@ public class EntityCollectionManagerIT { SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", currentMaxSize + "" ); } + @Test public void invalidNameRepair() throws ConnectionException { //write an entity with a unique field - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); //if we add a second field we get a second entity that is the exact same. Is this expected? - final IntegerField expectedInteger = new IntegerField( "count", 5, true ); - // final StringField expectedString = new StringField( "yes", "fred", true ); + final IntegerField expectedInteger = new IntegerField( "count", 5, true ); + // final StringField expectedString = new StringField( "yes", "fred", true ); newEntity.setField( expectedInteger ); - // newEntity.setField( expectedString ); + // newEntity.setField( expectedString ); EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -677,23 +668,26 @@ public class EntityCollectionManagerIT { assertNotNull( "Id was assigned", createReturned.getId() ); assertNotNull( "Version was assigned", createReturned.getVersion() ); - FieldSet - fieldResults = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last(); + FieldSet fieldResults = + manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ) + .toBlocking().last(); - assertEquals(1,fieldResults.size()); + assertEquals( 1, fieldResults.size() ); //verify the entity is correct. - assertEquals( "Same value", createReturned, fieldResults.getEntity( expectedInteger ).getEntity().get()); //loadReturned ); + assertEquals( "Same value", createReturned, + fieldResults.getEntity( expectedInteger ).getEntity().get() ); //loadReturned ); //use the entity serializationStrategy to remove the entity data. //do a mark as one test, and a delete as another - entitySerializationStrategy.delete( context,createReturned.getId(),createReturned.getVersion() ).execute(); + entitySerializationStrategy.delete( context, createReturned.getId(), createReturned.getVersion() ).execute(); //try to load via the unique field, should have triggered repair - final FieldSet - results = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last(); + final FieldSet results = + manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ) + .toBlocking().last(); //verify no entity returned @@ -701,37 +695,104 @@ public class EntityCollectionManagerIT { //user the unique serialization to verify it's been deleted from cassandra - UniqueValueSet uniqueValues = uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() ); + UniqueValueSet uniqueValues = + uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() ); assertFalse( uniqueValues.iterator().hasNext() ); - } @Test public void testGetIdField() throws Exception { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); EntityCollectionManager manager = factory.createCollectionManager( context ); // create an entity of type "item" with a unique_id field value = 1 Entity entity1 = new Entity( new SimpleId( "item" ) ); - entity1.setField( new StringField( "unique_id", "1", true )); + entity1.setField( new StringField( "unique_id", "1", true ) ); manager.write( entity1 ).toBlocking().last(); - final Observable<Id> idObs = manager.getIdField("item", new StringField("unique_id", "1")); - Id id = idObs.toBlocking().lastOrDefault(null); - assertEquals(entity1.getId(), id); + final Observable<Id> idObs = manager.getIdField( "item", new StringField( "unique_id", "1" ) ); + Id id = idObs.toBlocking().lastOrDefault( null ); + assertEquals( entity1.getId(), id ); // create an entity of type "deleted_item" with a unique_id field value = 1 Entity entity2 = new Entity( new SimpleId( "deleted_item" ) ); - entity2.setField( new StringField( "unique_id", "1", true )); + entity2.setField( new StringField( "unique_id", "1", true ) ); manager = factory.createCollectionManager( context ); manager.write( entity2 ).toBlocking().last(); - final Observable<Id> id2Obs = manager.getIdField("deleted_item", new StringField("unique_id", "1")); - Id id2 = id2Obs.toBlocking().lastOrDefault(null); - assertEquals(entity2.getId(), id2); + final Observable<Id> id2Obs = manager.getIdField( "deleted_item", new StringField( "unique_id", "1" ) ); + Id id2 = id2Obs.toBlocking().lastOrDefault( null ); + assertEquals( entity2.getId(), id2 ); + } + + + @Test + public void writeGetVersionsDelete() { + + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + Entity entity = new Entity( new SimpleId( "test" ) ); + entity.setField( new IntegerField( "counter", 0 ) ); + + EntityCollectionManager manager = factory.createCollectionManager( context ); + + Entity createReturned = manager.write( entity ).toBlocking().lastOrDefault( null ); + + assertNotNull( "Id was assigned", createReturned.getId() ); + + final int size = 200; + + final Id entityId = createReturned.getId(); + + List<UUID> versions = new ArrayList<>( size ); + versions.add( entity.getVersion() ); + + //write new versions + for ( int i = 1; i < size; i++ ) { + final Entity newEntity = new Entity( entityId ); + + final Entity returnedEntity = manager.write( newEntity ).toBlocking().last(); + + versions.add( returnedEntity.getVersion() ); + } + + + //now get our values, and load the latest version + + final Entity lastVersion = manager.load( entityId ).toBlocking().last(); + + //ensure the latest version is correct + assertEquals( versions.get( versions.size() - 1 ), lastVersion.getVersion() ); + + + // now ensure all versions are correct + final List<MvccLogEntry> entries = manager.getVersions( entityId ).toList().toBlocking().last(); + + + assertEquals( "Same size expected", versions.size(), entries.size() ); + + for ( int i = 0; i < versions.size(); i++ ) { + assertEquals( versions.get( i ), entries.get( i ).getVersion() ); + } + + + //now get all the log versions, and delete them all we do it in 2+ batches to ensure we clean up as expected + manager.getVersions( entityId ).buffer( 100 ).flatMap( bufferList -> manager.delete( bufferList ) ) + .toBlocking().last(); + + + //now load them, there shouldn't be any versions + final List<MvccLogEntry> postDeleteEntries = manager.getVersions( entityId ).toList().toBlocking().last(); + + assertEquals( "All log entries should be removed", 0, postDeleteEntries.size() ); + + final Entity postDeleteLastVersion = manager.load( entityId ).toBlocking().lastOrDefault( null ); + + //ensure the latest version is correct + assertNull( "Last version was deleted", postDeleteLastVersion ); } }
