http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java ---------------------------------------------------------------------- diff --cc stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java index 1c04da9,e7d4fc4..77fd220 --- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java @@@ -123,6 -126,8 +126,7 @@@ public abstract class AbstractService i this.entitiesGetTimer = metricsFactory.getTimer(this.getClass(), "importEntities.get"); this.entitiesParallelGetTimer = metricsFactory.getTimer( this.getClass(),"importEntitiesP.get" ); this.invokeTimer = metricsFactory.getTimer( this.getClass(),"service.invoke" ); - + this.cacheFactory = injector.getInstance( CacheFactory.class ); } @@@ -447,45 -456,42 +455,42 @@@ * @param results */ private void importEntitiesParallel(final ServiceRequest request, final Results results ) { - Timer.Context timer = entitiesParallelGetTimer.time(); - try { - //create our tuples - final Observable<EntityTuple> tuples = Observable.create(new Observable.OnSubscribe<EntityTuple>() { - @Override - public void call(final Subscriber<? super EntityTuple> subscriber) { - subscriber.onStart(); - - final List<Entity> entities = results.getEntities(); - final int size = entities.size(); - for (int i = 0; i < size && !subscriber.isUnsubscribed(); i++) { - subscriber.onNext(new EntityTuple(i, entities.get(i))); - } + - subscriber.onCompleted(); + //create our tuples + final Observable<EntityTuple> tuples = Observable.create(new Observable.OnSubscribe<EntityTuple>() { + @Override + public void call(final Subscriber<? super EntityTuple> subscriber) { + subscriber.onStart(); + + final List<Entity> entities = results.getEntities(); + final int size = entities.size(); + for (int i = 0; i < size && !subscriber.isUnsubscribed(); i++) { + subscriber.onNext(new EntityTuple(i, entities.get(i))); } - }); - + subscriber.onCompleted(); + } + }); - //now process them in parallel up to 10 threads + //now process them in parallel up to 10 threads - tuples.flatMap(tuple -> { - //map the entity into the tuple - return Observable.just(tuple).doOnNext(parallelTuple -> { - //import the entity and set it at index - try { + Observable tuplesObservable = tuples.flatMap(tuple -> { + //map the entity into the tuple + return Observable.just(tuple).doOnNext(parallelTuple -> { + //import the entity and set it at index + try { - final Entity imported = importEntity(request, parallelTuple.entity); + final Entity imported = importEntity(request, parallelTuple.entity); - if (imported != null) { - results.setEntity(parallelTuple.index, imported); - } - } catch (Exception e) { - throw new RuntimeException(e); + if (imported != null) { + results.setEntity(parallelTuple.index, imported); } - }).subscribeOn(rxScheduler); - }, rxSchedulerFig.getImportThreads()).toBlocking().lastOrDefault(null); - } finally { - timer.stop(); - } + } catch (Exception e) { + throw new RuntimeException(e); + } + }).subscribeOn(rxScheduler); + }, rxSchedulerFig.getImportThreads()); + + ObservableTimer.time(tuplesObservable, entitiesParallelGetTimer).toBlocking().lastOrDefault(null); }
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/EntityInsertBenchMark.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/EntityInsertBenchMark.java index fe1edbf,fe1edbf..0000000 deleted file mode 100644,100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/EntityInsertBenchMark.java +++ /dev/null @@@ -1,272 -1,272 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.usergrid.tools; -- -- --import java.nio.ByteBuffer; --import java.util.Stack; --import java.util.UUID; --import java.util.concurrent.Callable; --import java.util.concurrent.ExecutorService; --import java.util.concurrent.Executors; --import java.util.concurrent.Future; -- --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; --import org.apache.usergrid.persistence.DynamicEntity; --import org.apache.usergrid.persistence.IndexBucketLocator; --import org.apache.usergrid.persistence.IndexBucketLocator.IndexType; --import org.apache.usergrid.persistence.cassandra.EntityManagerImpl; --import org.apache.usergrid.utils.UUIDUtils; -- --import org.apache.commons.cli.CommandLine; --import org.apache.commons.cli.Option; --import org.apache.commons.cli.OptionBuilder; --import org.apache.commons.cli.Options; -- --import me.prettyprint.hector.api.Keyspace; --import me.prettyprint.hector.api.beans.DynamicComposite; --import me.prettyprint.hector.api.mutation.Mutator; -- --import static me.prettyprint.hector.api.factory.HFactory.createMutator; --import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX; --import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_UNIQUE; --import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator; --import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key; --import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode; --import static org.apache.usergrid.persistence.cassandra.Serializers.*; -- -- --/** -- * A utility to insert entities into the em for benchmarking -- * -- * @author tnine -- */ --public class EntityInsertBenchMark extends ToolBase { -- -- -- private static final Logger logger = LoggerFactory.getLogger( EntityInsertBenchMark.class ); -- -- -- @Override -- @SuppressWarnings("static-access") -- public Options createOptions() { -- -- Option hostOption = -- OptionBuilder.withArgName( "host" ).hasArg().isRequired( true ).withDescription( "Cassandra host" ) -- .create( "host" ); -- -- Option countOption = -- OptionBuilder.withArgName( "count" ).hasArg().isRequired( true ).withDescription( "Number of records" ) -- .create( "count" ); -- -- Option appIdOption = OptionBuilder.withArgName( "appId" ).hasArg().isRequired( true ) -- .withDescription( "Application Id to use" ).create( "appId" ); -- -- Option workerOption = OptionBuilder.withArgName( "workers" ).hasArg().isRequired( true ) -- .withDescription( "Number of workers to use" ).create( "workers" ); -- -- -- Options options = new Options(); -- options.addOption( hostOption ); -- options.addOption( countOption ); -- options.addOption( appIdOption ); -- options.addOption( workerOption ); -- -- return options; -- } -- -- -- /* -- * (non-Javadoc) -- * -- * @see -- * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine) -- */ -- @Override -- public void runTool( CommandLine line ) throws Exception { -- startSpring(); -- -- logger.info( "Starting entity cleanup" ); -- -- int workerSize = Integer.parseInt( line.getOptionValue( "workers" ) ); -- -- -- ExecutorService executors = Executors.newFixedThreadPool( workerSize ); -- -- int count = Integer.parseInt( line.getOptionValue( "count" ) ); -- -- int size = count / workerSize; -- -- UUID appId = UUID.fromString( line.getOptionValue( "appId" ) ); -- -- Stack<Future<Void>> futures = new Stack<Future<Void>>(); -- -- for ( int i = 0; i < workerSize; i++ ) { -- futures.push( executors.submit( new InsertWorker( i, size, appId ) ) ); -- } -- -- System.out.println( "Waiting for workers to complete insertion" ); -- -- /** -- * Wait for all tasks to complete -- */ -- while ( !futures.isEmpty() ) { -- futures.pop().get(); -- } -- -- System.out.println( "All workers completed insertion" ); -- } -- -- -- private class InsertWorker implements Callable<Void> { -- -- private int count; -- -- private int workerNumber; -- -- private UUID appId; -- -- -- private InsertWorker( int workerNumber, int count, UUID appId ) { -- this.workerNumber = workerNumber; -- this.count = count; -- this.appId = appId; -- } -- -- -- /* -- * (non-Javadoc) -- * -- * @see java.util.concurrent.Callable#call() -- */ -- @Override -- public Void call() throws Exception { -- -- Keyspace ko = EntityInsertBenchMark.this.cass.getApplicationKeyspace( appId ); -- EntityManagerImpl em = ( EntityManagerImpl ) emf.getEntityManager( appId ); -- IndexBucketLocator indexBucketLocator = em.getIndexBucketLocator(); -- -- for ( int i = 0; i < count; i++ ) { -- -- Mutator<ByteBuffer> m = createMutator( ko, be ); -- -- DynamicEntity dynEntity = new DynamicEntity(); -- dynEntity.setType( "test" ); -- dynEntity.setUuid( UUIDUtils.newTimeUUID() ); -- -- String value = new StringBuilder().append( workerNumber ).append( "-" ).append( i ).toString(); -- -- -- String bucketId = -- indexBucketLocator.getBucket( appId, IndexType.COLLECTION, dynEntity.getUuid(), "test" ); -- -- Object index_name = key( appId, "tests", "test", bucketId ); -- -- IndexEntry entry = new IndexEntry( dynEntity.getUuid(), "test", value, UUIDUtils.newTimeUUID() ); -- -- addInsertToMutator( m, ENTITY_INDEX, index_name, entry.getIndexComposite(), null, -- System.currentTimeMillis() ); -- -- UniqueIndexer indexer = new UniqueIndexer( m ); -- indexer.writeIndex( appId, "tests", dynEntity.getUuid(), "test", value ); -- // write this to the direct collection index -- -- m.execute(); -- -- if ( i % 100 == 0 ) { -- System.out.println( -- String.format( "%s : Written %d of %d", Thread.currentThread().getName(), i, count ) ); -- } -- } -- -- return null; -- } -- } -- -- -- private class UniqueIndexer { -- -- private Mutator<ByteBuffer> mutator; -- -- -- /** -- * @param mutator -- */ -- public UniqueIndexer( Mutator<ByteBuffer> mutator ) { -- super(); -- this.mutator = mutator; -- } -- -- -- private void writeIndex( UUID applicationId, String collectionName, UUID entityId, String propName, -- Object entityValue ) { -- -- Object rowKey = key( applicationId, collectionName, propName, entityValue ); -- -- addInsertToMutator( mutator, ENTITY_UNIQUE, rowKey, entityId, null, System.currentTimeMillis() ); -- } -- } -- -- -- public static class IndexEntry { -- private final byte code; -- private String path; -- private final Object value; -- private final UUID timestampUuid; -- private final UUID entityId; -- -- -- public IndexEntry( UUID entityId, String path, Object value, UUID timestampUuid ) { -- this.entityId = entityId; -- this.path = path; -- this.value = value; -- code = indexValueCode( value ); -- this.timestampUuid = timestampUuid; -- } -- -- -- public String getPath() { -- return path; -- } -- -- -- public void setPath( String path ) { -- this.path = path; -- } -- -- -- public Object getValue() { -- return value; -- } -- -- -- public byte getValueCode() { -- return code; -- } -- -- -- public UUID getTimestampUuid() { -- return timestampUuid; -- } -- -- -- public DynamicComposite getIndexComposite() { -- return new DynamicComposite( code, value, entityId, timestampUuid ); -- } -- } --} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/EntityReadBenchMark.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/EntityReadBenchMark.java index 1b54495,1b54495..0000000 deleted file mode 100644,100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/EntityReadBenchMark.java +++ /dev/null @@@ -1,353 -1,353 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.usergrid.tools; -- -- --import java.nio.ByteBuffer; --import java.util.ArrayList; --import java.util.List; --import java.util.Stack; --import java.util.UUID; --import java.util.concurrent.Callable; --import java.util.concurrent.ExecutorService; --import java.util.concurrent.Executors; --import java.util.concurrent.Future; --import java.util.concurrent.TimeUnit; -- --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; --import org.springframework.util.Assert; --import org.apache.usergrid.persistence.IndexBucketLocator; --import org.apache.usergrid.persistence.IndexBucketLocator.IndexType; --import org.apache.usergrid.persistence.cassandra.EntityManagerImpl; -- --import org.apache.commons.cli.CommandLine; --import org.apache.commons.cli.Option; --import org.apache.commons.cli.OptionBuilder; --import org.apache.commons.cli.Options; -- --import com.yammer.metrics.Metrics; --import com.yammer.metrics.core.MetricPredicate; --import com.yammer.metrics.core.Timer; --import com.yammer.metrics.core.TimerContext; --import com.yammer.metrics.reporting.ConsoleReporter; -- --import me.prettyprint.hector.api.Keyspace; --import me.prettyprint.hector.api.beans.AbstractComposite.ComponentEquality; --import me.prettyprint.hector.api.beans.DynamicComposite; --import me.prettyprint.hector.api.beans.HColumn; --import me.prettyprint.hector.api.beans.Row; --import me.prettyprint.hector.api.beans.Rows; --import me.prettyprint.hector.api.factory.HFactory; --import me.prettyprint.hector.api.query.MultigetSliceQuery; --import me.prettyprint.hector.api.query.QueryResult; -- --import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX; --import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_UNIQUE; --import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key; --import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode; --import static org.apache.usergrid.utils.ConversionUtils.bytebuffers; --import static org.apache.usergrid.persistence.cassandra.Serializers.*; -- -- --/** -- * A utility to insert entities into the em for benchmarking -- * -- * @author tnine -- */ --public class EntityReadBenchMark extends ToolBase { -- -- -- -- private static final Logger logger = LoggerFactory.getLogger( EntityReadBenchMark.class ); -- -- private final Timer queryReads = -- Metrics.newTimer( ReadWorker.class, "entity", TimeUnit.MILLISECONDS, TimeUnit.SECONDS ); -- -- private final Timer dictReads = -- Metrics.newTimer( ReadWorker.class, "dictionary", TimeUnit.MILLISECONDS, TimeUnit.SECONDS ); -- -- private static final String TYPE_DICTIONARY = "dict"; -- private static final String TYPE_ENTITY = "entity"; -- -- -- @Override -- @SuppressWarnings("static-access") -- public Options createOptions() { -- -- Option hostOption = -- OptionBuilder.withArgName( "host" ).hasArg().isRequired( true ).withDescription( "Cassandra host" ) -- .create( "host" ); -- -- Option countOption = -- OptionBuilder.withArgName( "count" ).hasArg().isRequired( true ).withDescription( "Number of records" ) -- .create( "count" ); -- -- Option appIdOption = OptionBuilder.withArgName( "appId" ).hasArg().isRequired( true ) -- .withDescription( "Application Id to use" ).create( "appId" ); -- -- Option workerOption = OptionBuilder.withArgName( "workers" ).hasArg().isRequired( true ) -- .withDescription( "Number of workers to use" ).create( "workers" ); -- -- -- Option typeOption = OptionBuilder.withArgName( "type" ).hasArg().isRequired( true ) -- .withDescription( "Read type to use, 'dict' or 'entity'" ).create( "type" ); -- -- Options options = new Options(); -- options.addOption( hostOption ); -- options.addOption( countOption ); -- options.addOption( appIdOption ); -- options.addOption( workerOption ); -- options.addOption( typeOption ); -- -- return options; -- } -- -- -- /* -- * (non-Javadoc) -- * -- * @see -- * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine) -- */ -- @Override -- public void runTool( CommandLine line ) throws Exception { -- startSpring(); -- -- logger.info( "Starting entity cleanup" ); -- -- int workerSize = Integer.parseInt( line.getOptionValue( "workers" ) ); -- -- ExecutorService executors = Executors.newFixedThreadPool( workerSize ); -- -- int count = Integer.parseInt( line.getOptionValue( "count" ) ); -- -- int size = count / workerSize; -- -- UUID appId = UUID.fromString( line.getOptionValue( "appId" ) ); -- -- System.out.println( "Querying unique properties in the search index" ); -- -- -- final ConsoleReporter reporter = -- new ConsoleReporter( Metrics.defaultRegistry(), System.out, MetricPredicate.ALL ); -- -- //print every 30 seconds -- reporter.start( 30, TimeUnit.SECONDS ); -- -- Stack<Future<Void>> futures = new Stack<Future<Void>>(); -- -- -- String type = line.getOptionValue( "type" ); -- -- for ( int i = 0; i < workerSize; i++ ) { -- -- ReadWorker worker = null; -- -- -- if ( TYPE_ENTITY.equals( type ) ) { -- worker = new IndexReadWorker( i, size, appId ); -- } -- else if ( TYPE_DICTIONARY.equals( type ) ) { -- worker = new DictReadWorker( i, size, appId ); -- } -- else { -- throw new IllegalArgumentException( "You must specifiy the 'type' option" ); -- } -- -- futures.push( executors.submit( worker ) ); -- } -- -- -- System.out.println( "Waiting for index read workers to complete" ); -- -- /** -- * Wait for all tasks to complete -- */ -- while ( !futures.isEmpty() ) { -- futures.pop().get(); -- } -- -- -- System.out.println( "All workers completed reading" ); -- -- -- //print the report -- reporter.run(); -- } -- -- -- private abstract class ReadWorker implements Callable<Void> { -- -- protected int count; -- -- protected int workerNumber; -- -- protected UUID appId; -- -- -- private ReadWorker( int workerNumber, int count, UUID appId ) throws Exception { -- this.workerNumber = workerNumber; -- this.count = count; -- this.appId = appId; -- } -- -- -- /* -- * (non-Javadoc) -- * -- * @see java.util.concurrent.Callable#call() -- */ -- @Override -- public Void call() throws Exception { -- -- -- for ( int i = 0; i < count; i++ ) { -- -- String value = new StringBuilder().append( workerNumber ).append( "-" ).append( i ).toString(); -- -- -- doRead( value ); -- } -- -- return null; -- } -- -- -- protected abstract void doRead( String value ) throws Exception; -- } -- -- -- private class IndexReadWorker extends ReadWorker { -- -- private Keyspace keyspace; -- private IndexBucketLocator indexBucketLocator = null; -- -- -- private IndexReadWorker( int workerNumber, int count, UUID appId ) throws Exception { -- super( workerNumber, count, appId ); -- keyspace = EntityReadBenchMark.this.cass.getApplicationKeyspace( appId ); -- indexBucketLocator = ( ( EntityManagerImpl ) EntityReadBenchMark.this.emf.getEntityManager( appId ) ) -- .getIndexBucketLocator(); -- } -- -- -- /* (non-Javadoc) -- * @see org.apache.usergrid.tools.EntityReadBenchMark.ReadWorker#doRead() -- */ -- @Override -- protected void doRead( String value ) throws Exception { -- TimerContext timer = queryReads.time(); -- -- Assert.isTrue( read( value ) ); -- -- timer.stop(); -- } -- -- -- private boolean read( String value ) { -- -- -- List<String> buckets = indexBucketLocator.getBuckets( appId, IndexType.UNIQUE, "tests" ); -- -- List<Object> cassKeys = new ArrayList<Object>( buckets.size() ); -- -- Object keyPrefix = key( appId, "tests", "test" ); -- -- for ( String bucket : buckets ) { -- cassKeys.add( key( keyPrefix, bucket ) ); -- } -- -- MultigetSliceQuery<ByteBuffer, DynamicComposite, ByteBuffer> multiget = -- HFactory.createMultigetSliceQuery( keyspace, be, dce, -- be ); -- -- multiget.setColumnFamily( ENTITY_INDEX.getColumnFamily() ); -- multiget.setKeys( bytebuffers( cassKeys ) ); -- -- -- DynamicComposite start = new DynamicComposite( indexValueCode( value ), value ); -- -- DynamicComposite finish = new DynamicComposite( indexValueCode( value ) ); -- finish.addComponent( 1, value, ComponentEquality.GREATER_THAN_EQUAL ); -- -- -- multiget.setRange( start, finish, false, 1 ); -- QueryResult<Rows<ByteBuffer, DynamicComposite, ByteBuffer>> results = multiget.execute(); -- -- // search for a column, if one exists, we've found the entity -- for ( Row<ByteBuffer, DynamicComposite, ByteBuffer> row : results.get() ) { -- if ( row.getColumnSlice().getColumns().size() > 0 ) { -- return true; -- } -- } -- -- return false; -- } -- } -- -- -- private class DictReadWorker extends ReadWorker { -- -- -- UniqueIndexer indexer; -- -- -- private DictReadWorker( int workerNumber, int count, UUID appId ) throws Exception { -- super( workerNumber, count, appId ); -- Keyspace ko = EntityReadBenchMark.this.cass.getApplicationKeyspace( appId ); -- indexer = new UniqueIndexer( ko ); -- } -- -- -- /* (non-Javadoc) -- * @see org.apache.usergrid.tools.EntityReadBenchMark.ReadWorker#doRead() -- */ -- @Override -- protected void doRead( String value ) throws Exception { -- -- TimerContext timer = dictReads.time(); -- -- Assert.isTrue( indexer.existsInIndex( appId, "tests", "test", value ) ); -- -- timer.stop(); -- } -- } -- -- -- private class UniqueIndexer { -- -- private Keyspace keyspace; -- public UniqueIndexer( Keyspace keyspace ) { -- super(); -- this.keyspace = keyspace; -- } -- -- -- private boolean existsInIndex( UUID applicationId, String collectionName, String propName, Object entityValue ) -- throws Exception { -- Object rowKey = key( applicationId, collectionName, propName, entityValue ); -- -- -- List<HColumn<ByteBuffer, ByteBuffer>> cols = -- cass.getColumns( keyspace, ENTITY_UNIQUE, rowKey, null, null, 2, false ); -- -- -- return cols.size() > 0; -- } -- } --} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/Export.java index eb616a8,eb616a8..bc83921 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java @@@ -26,6 -26,6 +26,7 @@@ import java.util.Map.Entry import java.util.Set; import java.util.UUID; ++import org.apache.usergrid.persistence.*; import org.codehaus.jackson.JsonEncoding; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; @@@ -33,19 -33,19 +34,12 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; import org.apache.usergrid.management.OrganizationInfo; import org.apache.usergrid.management.UserInfo; --import org.apache.usergrid.persistence.ConnectionRef; --import org.apache.usergrid.persistence.Entity; --import org.apache.usergrid.persistence.EntityManager; --import org.apache.usergrid.persistence.index.query.Query; --import org.apache.usergrid.persistence.Results; import org.apache.usergrid.tools.bean.ExportOrg; import org.apache.usergrid.utils.JsonUtils; import org.apache.commons.cli.CommandLine; import com.google.common.collect.BiMap; --import org.apache.usergrid.persistence.SimpleEntityRef; --import org.apache.usergrid.persistence.index.query.Query.Level; public class Export extends ExportingToolBase { @@@ -175,7 -175,7 +169,7 @@@ Query query = new Query(); query.setLimit( MAX_ENTITY_FETCH ); -- query.setResultsLevel( Level.ALL_PROPERTIES ); ++ query.setResultsLevel( Query.Level.ALL_PROPERTIES ); Results entities = em.searchCollection( em.getApplicationRef(), collectionName, query ); @@@ -239,7 -239,7 +233,7 @@@ // Start collection array. jg.writeStartArray(); -- Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Level.IDS, false ); ++ Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Query.Level.IDS, false ); List<UUID> entityIds = collectionMembers.getIds(); @@@ -307,13 -307,13 +301,13 @@@ jg.writeFieldName( connectionType ); jg.writeStartArray(); -- Results results = em.getConnectedEntities( -- entity, connectionType, null, Level.IDS ); ++ Results results = em.getTargetEntities( ++ entity, connectionType, null, Query.Level.IDS ); List<ConnectionRef> connections = results.getConnections(); for ( ConnectionRef connectionRef : connections ) { -- jg.writeObject( connectionRef.getConnectedEntity().getUuid() ); ++ jg.writeObject( connectionRef.getTargetRefs().getUuid() ); } jg.writeEndArray(); @@@ -330,8 -330,8 +324,8 @@@ * write entity_id : { "collectionName" : [ids] * } * } -- * -- * ++ * ++ * * { * entity_id : * { collection_name : @@@ -348,9 -348,9 +342,9 @@@ * ] * } * } -- * ++ * * http://jackson.codehaus.org/1.8.0/javadoc/org/codehaus/jackson/JsonGenerator.html -- * ++ * * *- * List<ConnectedEntityRef> connections = em.getConnections(entityId, query); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/Import.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/Import.java index 01313e3,01313e3..0861a0a --- a/stack/tools/src/main/java/org/apache/usergrid/tools/Import.java +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Import.java @@@ -25,6 -25,6 +25,7 @@@ import java.util.Map import java.util.Map.Entry; import java.util.UUID; ++import com.google.common.base.Optional; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonToken; @@@ -366,10 -366,10 +367,10 @@@ public class Import extends ToolBase // Retrieve the namepsace for this collection. It's part of the name String applicationName = getApplicationFromColllection( collectionFileName ); -- UUID appId = emf.lookupApplication( applicationName ); ++ Optional<UUID> appId = emf.lookupApplication( applicationName ); //no org in path, this is a pre public beta so we need to create the new path -- if ( appId == null && !applicationName.contains( "/" ) ) { ++ if ( !appId.isPresent() && !applicationName.contains( "/" ) ) { String fileName = collectionFileName.replace( "collections", "application" ); File applicationFile = new File( importDir, fileName ); @@@ -413,8 -413,8 +414,8 @@@ } -- if ( appId == null ) { -- logger.error( "Unable to find application with name {}. Skipping collections", appId ); ++ if ( !appId.isPresent() ) { ++ logger.error( "Unable to find application with name {}. Skipping collections", applicationName ); return; } @@@ -427,7 -427,7 +428,7 @@@ jp.nextToken(); // START_OBJECT this is the outter hashmap -- EntityManager em = emf.getEntityManager( appId ); ++ EntityManager em = emf.getEntityManager( appId.get() ); while ( jp.nextToken() != JsonToken.END_OBJECT ) { importEntitysStuff( jp, em ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java index 5c35e85,5c35e85..0000000 deleted file mode 100644,100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java +++ /dev/null @@@ -1,184 -1,184 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.usergrid.tools; -- -- --import java.util.Collection; --import java.util.Collections; --import java.util.Map; --import java.util.Map.Entry; --import java.util.Set; --import java.util.UUID; -- --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; --import org.apache.usergrid.persistence.EntityManager; --import org.apache.usergrid.utils.UUIDUtils; -- --import org.apache.commons.cli.CommandLine; --import org.apache.commons.cli.Option; --import org.apache.commons.cli.OptionBuilder; --import org.apache.commons.cli.Options; --import org.apache.usergrid.persistence.EntityManagerFactory; --import org.apache.usergrid.persistence.EntityRef; -- -- --/** -- * Index rebuild utility for Usergrid. Can be used to rebuild the index for a specific -- * application, a specific application's collection or for an entire Usergrid system. -- */ --public class IndexRebuild extends ToolBase { -- -- private static final String APPLICATION_ARG = "app"; -- -- private static final String COLLECTION_ARG = "col"; -- -- private static final String ALL_ARG = "all"; -- -- private static final int PAGE_SIZE = 100; -- -- -- private static final Logger logger = LoggerFactory.getLogger( IndexRebuild.class ); -- -- -- @Override -- @SuppressWarnings("static-access") -- public Options createOptions() { -- -- Option hostOpt = OptionBuilder.withArgName( "host" ).hasArg().isRequired( true ) -- .withDescription( "Cassandra host" ).create( "host" ); -- -- Option esHostsOpt = OptionBuilder.withArgName( "host" ).hasArg().isRequired( true ) -- .withDescription( "ElasticSearch host" ).create( "eshost" ); -- -- Option esClusterOpt = OptionBuilder.withArgName( "host" ).hasArg().isRequired( true ) -- .withDescription( "ElasticSearch cluster name" ).create( "escluster" ); -- -- Option appOpt = OptionBuilder.withArgName( APPLICATION_ARG ).hasArg().isRequired( false ) -- .withDescription( "Application id or app name" ).create( APPLICATION_ARG ); -- -- Option collOpt = OptionBuilder.withArgName( COLLECTION_ARG ).hasArg().isRequired( false ) -- .withDescription( "Collection name" ).create( COLLECTION_ARG ); -- -- Option allOpt = OptionBuilder.withType( Boolean.class ) -- .withArgName( ALL_ARG ).hasArg().isRequired( false ) -- .withDescription( "True to reindex all application" ).create( ALL_ARG ); -- -- Options options = new Options(); -- options.addOption( hostOpt ); -- options.addOption( esHostsOpt ); -- options.addOption( esClusterOpt ); -- options.addOption( appOpt ); -- options.addOption( collOpt ); -- options.addOption( allOpt ); -- -- return options; -- } -- -- -- /* -- * (non-Javadoc) -- * -- * @see -- * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine) -- */ -- @Override -- public void runTool( CommandLine line ) throws Exception { -- startSpring(); -- -- logger.info( "Starting index rebuild" ); -- -- EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() { -- -- @Override -- public void onProgress(EntityRef entity) { -- logger.info("Indexing entity {}:{}", entity.getType(), entity.getUuid()); -- } -- -- @Override -- public long getWriteDelayTime() { -- return 100; -- } -- }; -- -- emf.rebuildInternalIndexes( po ); -- emf.refreshIndex(); -- -- if ( line.getOptionValue("all") != null && line.getOptionValue("all").equalsIgnoreCase("true") ) { -- emf.rebuildAllIndexes( po ); -- -- } else if ( line.getOptionValue( APPLICATION_ARG ) != null ) { -- -- // Goes through each app id specified -- for ( UUID appId : getAppIds( line ) ) { -- -- logger.info( "Reindexing for app id: {}", appId ); -- Set<String> collections = getCollections( line, appId ); -- -- for ( String collection : collections ) { -- emf.rebuildCollectionIndex( appId, collection, po ); -- emf.refreshIndex(); -- } -- } -- -- } else { -- -- Map<String, UUID> ids = emf.getApplications(); -- System.out.println( "Printing all apps" ); -- for ( Entry<String, UUID> entry : ids.entrySet() ) { -- System.out.println( entry.getKey() + " appid=" + entry.getValue() ); -- } -- } -- -- logger.info( "Finished index rebuild" ); -- } -- -- -- /** Get all app id */ -- private Collection<UUID> getAppIds( CommandLine line ) throws Exception { -- -- String appId = line.getOptionValue( APPLICATION_ARG ); -- -- Map<String, UUID> ids = emf.getApplications(); -- -- if ( appId != null ) { -- UUID id = UUIDUtils.tryExtractUUID( appId ); -- if ( id == null ) { -- logger.debug("Got applications: " + ids ); -- id = emf.getApplications().get( appId ); -- } -- return Collections.singleton( id ); -- } -- -- return ids.values(); -- } -- -- -- /** Get collection names. If none are specified, all are returned */ -- private Set<String> getCollections( CommandLine line, UUID appId ) throws Exception { -- -- String passedName = line.getOptionValue( COLLECTION_ARG ); -- -- if ( passedName != null ) { -- return Collections.singleton( passedName ); -- } -- -- EntityManager em = emf.getEntityManager( appId ); -- -- return em.getApplicationCollections(); -- } --} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java index 7b18ccc,7b18ccc..0000000 deleted file mode 100644,100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java +++ /dev/null @@@ -1,119 -1,119 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.usergrid.tools; -- -- --import java.nio.ByteBuffer; --import java.util.List; --import java.util.Map; --import java.util.UUID; -- --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; -- --import org.apache.commons.cli.CommandLine; --import org.apache.commons.cli.Options; --import org.apache.log4j.Level; --import org.apache.log4j.LogManager; -- --import org.apache.usergrid.management.OrganizationInfo; -- --import com.google.common.collect.BiMap; -- --import me.prettyprint.cassandra.serializers.ByteBufferSerializer; --import me.prettyprint.hector.api.Keyspace; --import me.prettyprint.hector.api.mutation.Mutator; -- --import static me.prettyprint.hector.api.factory.HFactory.createMutator; --import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME; --import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID; --import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator; --import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute; --import static org.apache.usergrid.persistence.cassandra.CassandraService.APPLICATIONS_CF; --import static org.apache.usergrid.persistence.cassandra.CassandraService.RETRY_COUNT; -- -- --public class RepairingMismatchedApplicationMetadata extends ToolBase { -- -- public static final ByteBufferSerializer be = new ByteBufferSerializer(); -- -- private static final Logger logger = LoggerFactory.getLogger( RepairingMismatchedApplicationMetadata.class ); -- -- -- @Override -- public Options createOptions() { -- Options options = super.createOptions(); -- return options; -- } -- -- -- @Override -- public void runTool( CommandLine line ) throws Exception { -- startSpring(); -- -- //sucks, but it's not picking up the configuration -- LogManager.getLogger( RepairingMismatchedApplicationMetadata.class ).setLevel( Level.INFO ); -- -- UUID orgId = null; -- List<OrganizationInfo> orgs; -- -- final int size = 1000; -- -- -- do { -- orgs = managementService.getOrganizations( orgId, size ); -- -- -- for ( OrganizationInfo org : orgs ) { -- -- orgId = org.getUuid(); -- -- logger.info( "Auditing org {}", org.getName() ); -- -- try { -- BiMap<UUID, String> apps = managementService.getApplicationsForOrganization( org.getUuid() ); -- -- -- for ( Map.Entry<UUID, String> app : apps.entrySet() ) { -- -- logger.info( "Auditing org {} app {}", org.getName(), app.getValue() ); -- -- UUID applicationId = emf.lookupApplication( app.getValue() ); -- if ( applicationId == null ) { -- String appName = app.getValue(); -- Keyspace ko = cass.getSystemKeyspace(); -- Mutator<ByteBuffer> m = createMutator( ko, be ); -- long timestamp = cass.createTimestamp(); -- addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_UUID, app.getKey(), timestamp ); -- addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_NAME, appName, timestamp ); -- batchExecute( m, RETRY_COUNT ); -- logger.info( "Repairing alias with app uuid {}, and name {}", app.getKey(), -- app.getValue() ); -- } -- } -- } -- catch ( Exception e ) { -- logger.error( "Unable to process applications for organization {}", org, e ); -- } -- } -- } -- while ( orgs != null && orgs.size() == size ); -- -- logger.info( "Completed repairing aliases" ); -- Thread.sleep( 1000 * 60 ); -- } --} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java index aa33e49,aa33e49..a78143b --- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java @@@ -181,7 -181,7 +181,7 @@@ public abstract class ToolBase Setup setup = ( (CpEntityManagerFactory) emf ).getSetup(); logger.info( "Setting up Usergrid schema" ); -- setup.init(); ++ setup.initSubsystems(); logger.info( "Usergrid schema setup" ); logger.info( "Setting up Usergrid management services" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueIndexCleanup.java ---------------------------------------------------------------------- diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/UniqueIndexCleanup.java index 0ae45d6,0ae45d6..0000000 deleted file mode 100644,100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueIndexCleanup.java +++ /dev/null @@@ -1,386 -1,386 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.usergrid.tools; -- -- --import java.nio.ByteBuffer; --import java.util.ArrayList; --import java.util.HashMap; --import java.util.HashSet; --import java.util.List; --import java.util.Map; --import java.util.Map.Entry; --import java.util.Set; --import java.util.UUID; -- --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; --import org.apache.usergrid.management.ApplicationInfo; --import org.apache.usergrid.persistence.Entity; --import org.apache.usergrid.persistence.EntityManager; --import org.apache.usergrid.persistence.EntityManagerFactory; --import org.apache.usergrid.persistence.index.query.Identifier; --import org.apache.usergrid.persistence.IndexBucketLocator; --import org.apache.usergrid.persistence.IndexBucketLocator.IndexType; --import org.apache.usergrid.persistence.cassandra.CassandraService; --import org.apache.usergrid.persistence.cassandra.EntityManagerImpl; --import org.apache.usergrid.persistence.cassandra.index.IndexScanner; --import org.apache.usergrid.persistence.entities.Application; --import org.apache.usergrid.persistence.query.ir.result.ScanColumn; --import org.apache.usergrid.persistence.query.ir.result.SliceIterator; --import org.apache.usergrid.persistence.query.ir.result.UUIDIndexSliceParser; --import org.apache.usergrid.persistence.schema.CollectionInfo; -- --import org.apache.commons.cli.CommandLine; --import org.apache.commons.cli.Option; --import org.apache.commons.cli.OptionBuilder; --import org.apache.commons.cli.Options; -- --import me.prettyprint.hector.api.Keyspace; --import me.prettyprint.hector.api.beans.AbstractComposite.ComponentEquality; --import me.prettyprint.hector.api.beans.DynamicComposite; --import me.prettyprint.hector.api.beans.HColumn; --import me.prettyprint.hector.api.mutation.Mutator; -- --import static me.prettyprint.hector.api.factory.HFactory.createMutator; --import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS; --import static org.apache.usergrid.persistence.Schema.getDefaultSchema; --import org.apache.usergrid.persistence.SimpleEntityRef; --import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX; --import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES; --import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator; --import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key; --import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT; --import static org.apache.usergrid.utils.CompositeUtils.setEqualityFlag; --import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros; --import static org.apache.usergrid.utils.UUIDUtils.newTimeUUID; --import static org.apache.usergrid.persistence.cassandra.Serializers.*; -- -- --/** -- * This is a utility to audit all available entity ids in the secondary index. It then checks to see if any index value -- * is not present in the Entity_Index_Entries. If it is not, the value from the index is removed, and a forced re-index -- * is triggered -- * <p/> -- * USERGRID-323 -- * <p/> -- * <p/> -- * UniqueIndexCleanup -app [appid] -col [collectionname] -- * -- * @author tnine -- */ --public class UniqueIndexCleanup extends ToolBase { -- -- /** -- * -- */ -- private static final int PAGE_SIZE = 100; -- -- -- -- private static final Logger logger = LoggerFactory.getLogger( UniqueIndexCleanup.class ); -- -- /** -- * -- */ -- private static final String APPLICATION_ARG = "app"; -- -- /** -- * -- */ -- private static final String COLLECTION_ARG = "col"; -- -- -- @Override -- @SuppressWarnings("static-access") -- public Options createOptions() { -- -- -- Options options = new Options(); -- -- Option hostOption = -- OptionBuilder.withArgName( "host" ).hasArg().isRequired( true ).withDescription( "Cassandra host" ) -- .create( "host" ); -- -- -- options.addOption( hostOption ); -- -- -- Option appOption = OptionBuilder.withArgName( APPLICATION_ARG ).hasArg().isRequired( false ) -- .withDescription( "application id or app name" ).create( APPLICATION_ARG ); -- -- -- options.addOption( appOption ); -- -- Option collectionOption = OptionBuilder.withArgName( COLLECTION_ARG ).hasArg().isRequired( false ) -- .withDescription( "colleciton name" ).create( COLLECTION_ARG ); -- -- options.addOption( collectionOption ); -- -- return options; -- } -- -- -- /* -- * (non-Javadoc) -- * -- * @see -- * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine) -- */ -- @Override -- public void runTool( CommandLine line ) throws Exception { -- startSpring(); -- -- logger.info( "Starting entity cleanup" ); -- -- Map<String, UUID> apps = getApplications( emf, line ); -- -- -- for ( Entry<String, UUID> app : apps.entrySet() ) { -- -- logger.info( "Starting cleanup for app {}", app.getKey() ); -- -- UUID applicationId = app.getValue(); -- EntityManagerImpl em = ( EntityManagerImpl ) emf.getEntityManager( applicationId ); -- -- //sanity check for corrupt apps -- Application appEntity = em.getApplication(); -- -- if ( appEntity == null ) { -- logger.warn( "Application does not exist in data. {}", app.getKey() ); -- continue; -- } -- -- CassandraService cass = em.getCass(); -- IndexBucketLocator indexBucketLocator = em.getIndexBucketLocator(); -- -- Keyspace ko = cass.getApplicationKeyspace( applicationId ); -- -- UUID timestampUuid = newTimeUUID(); -- long timestamp = getTimestampInMicros( timestampUuid ); -- -- -- // go through each collection and audit the values -- for ( String collectionName : getCollectionNames( em, line ) ) { -- -- -- IndexScanner scanner = cass.getIdList( cass.getApplicationKeyspace( applicationId ), -- key( applicationId, DICTIONARY_COLLECTIONS, collectionName ), null, null, PAGE_SIZE, false, -- indexBucketLocator, applicationId, collectionName, false ); -- -- SliceIterator itr = new SliceIterator( null, scanner, new UUIDIndexSliceParser() ); -- -- -- while ( itr.hasNext() ) { -- -- Set<ScanColumn> ids = itr.next(); -- -- CollectionInfo collection = getDefaultSchema().getCollection( "application", collectionName ); -- -- -- //We shouldn't have to do this, but otherwise the cursor won't work -- Set<String> indexed = collection.getPropertiesIndexed(); -- -- // what's left needs deleted, do so -- -- logger.info( "Auditing {} entities for collection {} in app {}", new Object[] { -- ids.size(), collectionName, app.getValue() -- } ); -- -- for ( ScanColumn col : ids ) { -- final UUID id = col.getUUID(); -- String type = getDefaultSchema().getCollectionType("application", collectionName); -- -- boolean reIndex = false; -- -- Mutator<ByteBuffer> m = createMutator( ko, be ); -- -- try { -- -- for ( String prop : indexed ) { -- -- String bucket = -- indexBucketLocator.getBucket( applicationId, IndexType.COLLECTION, id, prop ); -- -- Object rowKey = key( applicationId, collection.getName(), prop, bucket ); -- -- List<HColumn<ByteBuffer, ByteBuffer>> indexCols = -- scanIndexForAllTypes( ko, indexBucketLocator, applicationId, rowKey, id, prop ); -- -- // loop through the indexed values and verify them as present in -- // our entity_index_entries. If they aren't, we need to delete the -- // from the secondary index, and mark -- // this object for re-index via n update -- for ( HColumn<ByteBuffer, ByteBuffer> index : indexCols ) { -- -- DynamicComposite secondaryIndexValue = -- DynamicComposite.fromByteBuffer( index.getName().duplicate() ); -- -- Object code = secondaryIndexValue.get( 0 ); -- Object propValue = secondaryIndexValue.get( 1 ); -- UUID timestampId = ( UUID ) secondaryIndexValue.get( 3 ); -- -- DynamicComposite existingEntryStart = -- new DynamicComposite( prop, code, propValue, timestampId ); -- DynamicComposite existingEntryFinish = -- new DynamicComposite( prop, code, propValue, timestampId ); -- -- setEqualityFlag( existingEntryFinish, ComponentEquality.GREATER_THAN_EQUAL ); -- -- // now search our EntityIndexEntry for previous values, see if -- // they don't match this one -- -- List<HColumn<ByteBuffer, ByteBuffer>> entries = -- cass.getColumns( ko, ENTITY_INDEX_ENTRIES, id, existingEntryStart, -- existingEntryFinish, INDEX_ENTRY_LIST_COUNT, false ); -- -- // we wouldn't find this column in our entity_index_entries -- // audit. Delete it, then mark this entity for update -- if ( entries.size() == 0 ) { -- logger.info( -- "Could not find reference to value '{}' property '{}'"+ -- " on entity {} in collection {}. " + " Forcing reindex", -- new Object[] { propValue, prop, id, collectionName } ); -- -- addDeleteToMutator( -- m, ENTITY_INDEX, rowKey, index.getName().duplicate(), timestamp ); -- -- reIndex = true; -- } -- -- if ( entries.size() > 1 ) { -- logger.info( -- "Found more than 1 entity referencing unique index " -- + "for property '{}' with value " + "'{}'", -- prop, propValue ); -- reIndex = true; -- } -- } -- } -- -- //force this entity to be updated -- if ( reIndex ) { -- Entity entity = em.get( new SimpleEntityRef( type, id )); -- -- //entity may not exist, but we should have deleted rows from the index -- if ( entity == null ) { -- logger.warn( "Entity with id {} did not exist in app {}", -- id, applicationId ); -- //now execute the cleanup. In this case the entity is gone, -- // so we'll want to remove references from -- // the secondary index -- m.execute(); -- continue; -- } -- -- -- logger.info( "Reindex complete for entity with id '{} ", id ); -- em.update( entity ); -- -- //now execute the cleanup. This way if the above update fails, -- // we still have enough data to run again -- // later -- m.execute(); -- } -- } -- catch ( Exception e ) { -- logger.error( "Unable to process entity with id '{}'", id, e ); -- } -- } -- } -- } -- } -- -- logger.info( "Completed audit of apps" ); -- } -- -- -- private Map<String, UUID> getApplications( EntityManagerFactory emf, CommandLine line ) throws Exception { -- String appName = line.getOptionValue( APPLICATION_ARG ); -- -- if ( appName == null ) { -- return emf.getApplications(); -- } -- -- ApplicationInfo app = managementService.getApplicationInfo( Identifier.from( appName ) ); -- -- if ( app == null ) { -- logger.error( "Could not find application with id or name {}", appName ); -- System.exit( 3 ); -- } -- -- -- Map<String, UUID> apps = new HashMap<String, UUID>(); -- -- apps.put( app.getName(), app.getId() ); -- -- return apps; -- } -- -- -- private Set<String> getCollectionNames( EntityManager em, CommandLine line ) throws Exception { -- -- String collectionName = line.getOptionValue( COLLECTION_ARG ); -- -- if ( collectionName == null ) { -- return em.getApplicationCollections(); -- } -- -- -- Set<String> names = new HashSet<String>(); -- names.add( collectionName ); -- -- return names; -- } -- -- -- private List<HColumn<ByteBuffer, ByteBuffer>> scanIndexForAllTypes( Keyspace ko, -- IndexBucketLocator indexBucketLocator, -- UUID applicationId, Object rowKey, -- UUID entityId, String prop ) throws Exception { -- -- //TODO Determine the index bucket. Scan the entire index for properties with this entityId. -- -- -- DynamicComposite start = null; -- -- List<HColumn<ByteBuffer, ByteBuffer>> cols; -- -- List<HColumn<ByteBuffer, ByteBuffer>> results = new ArrayList<HColumn<ByteBuffer, ByteBuffer>>(); -- -- -- do { -- cols = cass.getColumns( ko, ENTITY_INDEX, rowKey, start, null, 100, false ); -- -- for ( HColumn<ByteBuffer, ByteBuffer> col : cols ) { -- DynamicComposite secondaryIndexValue = DynamicComposite.fromByteBuffer( col.getName().duplicate() ); -- -- UUID storedId = ( UUID ) secondaryIndexValue.get( 2 ); -- -- //add it to the set. We can't short circuit due to property ordering -- if ( entityId.equals( storedId ) ) { -- results.add( col ); -- } -- -- start = secondaryIndexValue; -- } -- } -- while ( cols.size() == 100 ); -- -- return results; -- } --}
