Updated the data migration to also migrate version logs Updated the batch to use lambdas
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/72da01d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/72da01d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/72da01d5 Branch: refs/heads/two-dot-o-dev Commit: 72da01d52da94cfff6600e4a6dbc26f11e37d4b4 Parents: d145eb4 Author: Todd Nine <[email protected]> Authored: Tue Mar 31 15:04:47 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Tue Mar 31 15:04:47 2015 -0600 ---------------------------------------------------------------------- .../migration/MvccEntityDataMigrationImpl.java | 42 +++++++++--- .../index/impl/EsIndexBufferConsumerImpl.java | 72 ++++++-------------- 2 files changed, 52 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72da01d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java index 4551d5f..3168817 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java @@ -30,13 +30,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask; import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl; +import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyV2Impl; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; +import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyV2Impl; import org.apache.usergrid.persistence.core.migration.data.DataMigration; import org.apache.usergrid.persistence.core.migration.data.DataMigrationException; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; @@ -72,25 +76,25 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> private final Keyspace keyspace; private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions; - private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final EntityVersionTaskFactory entityVersionCleanupFactory; private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3; - + private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; + private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; @Inject public MvccEntityDataMigrationImpl( final Keyspace keyspace, final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final EntityVersionTaskFactory entityVersionCleanupFactory, - final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3 - ) { - + final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3, + final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, + final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) { this.keyspace = keyspace; this.allVersions = allVersions; - this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.entityVersionCleanupFactory = entityVersionCleanupFactory; this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3; + this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; + this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; } @@ -189,11 +193,11 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> // time with // no TTL so that cleanup can clean up // older values - for ( Field field : EntityUtils.getUniqueFields( message.entity.getEntity().get() ) ) { + for (final Field field : EntityUtils.getUniqueFields( message.entity.getEntity().get() ) ) { - UniqueValue written = new UniqueValueImpl( field, entityId, version ); + final UniqueValue written = new UniqueValueImpl( field, entityId, version ); - MutationBatch mb = uniqueValueSerializationStrategy.write( message.scope, written ); + final MutationBatch mb = uniqueValueSerializationStrategy.write( message.scope, written ); // merge into our @@ -202,6 +206,24 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope> totalBatch.mergeShallow( mb ); } + + //add all our log entries + final List<MvccLogEntry> logEntries = mvccLogEntrySerializationStrategy.load( message.scope, + message.entity.getId(), version, 1000 ); + + /** + * Migrate the log entry to the new format + */ + for(final MvccLogEntry entry: logEntries){ + + final MutationBatch mb = mvccLogEntrySerializationStrategy.write( message.scope, entry ); + + totalBatch.mergeShallow( mb ); + } + + + + //schedule our cleanup task to clean up all the data final EntityVersionCleanupTask task = entityVersionCleanupFactory .getCleanupTask( message.scope, message.entity.getId(), version, false ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72da01d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index 6d5a4d8..7e8d8f4 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.index.impl; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -50,8 +49,6 @@ import rx.Observable; import rx.Subscriber; import rx.Subscription; import rx.functions.Action1; -import rx.functions.Func1; -import rx.functions.Func2; import rx.schedulers.Schedulers; @@ -200,9 +197,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { } while ( true ); } - } ).doOnNext( new Action1<List<IndexOperationMessage>>() { - @Override - public void call( List<IndexOperationMessage> containerList ) { + } ).doOnNext( containerList -> { if ( containerList.size() == 0 ) { return; } @@ -214,16 +209,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { execute( containerList ); time.stop(); - } - } ) + } ) //ack after we process - .doOnNext( new Action1<List<IndexOperationMessage>>() { - @Override - public void call( final List<IndexOperationMessage> indexOperationMessages ) { - bufferQueue.ack( indexOperationMessages ); - //release so we know we've done processing - inFlight.addAndGet( -1 * indexOperationMessages.size() ); - } + .doOnNext( indexOperationMessages -> { + bufferQueue.ack( indexOperationMessages ); + //release so we know we've done processing + inFlight.addAndGet( -1 * indexOperationMessages.size() ); } ) .subscribeOn( Schedulers.newThread() ); @@ -240,54 +231,31 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { /** * Execute the request, check for errors, then re-init the batch for future use */ - private void execute(final List<IndexOperationMessage> operationMessages) { + private void execute( final List<IndexOperationMessage> operationMessages ) { - if (operationMessages == null || operationMessages.size() == 0) { + if ( operationMessages == null || operationMessages.size() == 0 ) { return; } //process and flatten all the messages to builder requests //batch shard operations into a bulk request - Observable.from( operationMessages ).flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() { - @Override - public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage ) { - final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() ); - final Observable<DeIndexRequest> deIndex = - Observable.from( indexOperationMessage.getDeIndexRequests() ); + Observable.from( operationMessages ).flatMap( indexOperationMessage -> { + final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() ); + final Observable<DeIndexRequest> deIndex = Observable.from( indexOperationMessage.getDeIndexRequests() ); - indexSizeCounter.dec( indexOperationMessage.getDeIndexRequests().size() ); - indexSizeCounter.dec( indexOperationMessage.getIndexRequests().size() ); + indexSizeCounter.dec( indexOperationMessage.getDeIndexRequests().size() ); + indexSizeCounter.dec( indexOperationMessage.getIndexRequests().size() ); - return Observable.merge( index, deIndex ); - } + return Observable.merge( index, deIndex ); } ) - //collection all the operations into a single stream - .reduce( initRequest(), new Func2<BulkRequestBuilder, BatchRequest, BulkRequestBuilder>() { - @Override - public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder, - final BatchRequest batchRequest ) { - batchRequest.doOperation( client, bulkRequestBuilder ); - - return bulkRequestBuilder; - } - } ) - //send the request off to ES - .doOnNext( new Action1<BulkRequestBuilder>() { - @Override - public void call( final BulkRequestBuilder bulkRequestBuilder ) { - sendRequest( bulkRequestBuilder ); - } - } ).toBlocking().lastOrDefault(null); + //collection all the operations into a single stream + .collect( () -> initRequest(), ( bulkRequestBuilder, batchRequest ) -> { + batchRequest.doOperation( client, bulkRequestBuilder ); + } ) //send the request off to ES + .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ).toBlocking().lastOrDefault( null ); //call back all futures - Observable.from(operationMessages) - .doOnNext(new Action1<IndexOperationMessage>() { - @Override - public void call(IndexOperationMessage operationMessage) { - operationMessage.getFuture().done(); - } - }) - .toBlocking().lastOrDefault(null); + Observable.from( operationMessages ).doOnNext( operationMessage -> operationMessage.getFuture().done() ).toBlocking().lastOrDefault( null ); }
