http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --cc 
stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 1c04da9,e7d4fc4..77fd220
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@@ -123,6 -126,8 +126,7 @@@ public abstract class AbstractService i
          this.entitiesGetTimer = metricsFactory.getTimer(this.getClass(), 
"importEntities.get");
          this.entitiesParallelGetTimer = metricsFactory.getTimer( 
this.getClass(),"importEntitiesP.get" );
          this.invokeTimer = metricsFactory.getTimer( 
this.getClass(),"service.invoke" );
 -
+         this.cacheFactory = injector.getInstance( CacheFactory.class );
      }
  
  
@@@ -447,45 -456,42 +455,42 @@@
       * @param results
       */
      private void importEntitiesParallel(final ServiceRequest request, final 
Results results ) {
-         Timer.Context timer = entitiesParallelGetTimer.time();
-         try {
-             //create our tuples
-             final Observable<EntityTuple> tuples = Observable.create(new 
Observable.OnSubscribe<EntityTuple>() {
-                 @Override
-                 public void call(final Subscriber<? super EntityTuple> 
subscriber) {
-                     subscriber.onStart();
- 
-                     final List<Entity> entities = results.getEntities();
-                     final int size = entities.size();
-                     for (int i = 0; i < size && !subscriber.isUnsubscribed(); 
i++) {
-                         subscriber.onNext(new EntityTuple(i, 
entities.get(i)));
-                     }
 +
-                     subscriber.onCompleted();
+         //create our tuples
+         final Observable<EntityTuple> tuples = Observable.create(new 
Observable.OnSubscribe<EntityTuple>() {
+             @Override
+             public void call(final Subscriber<? super EntityTuple> 
subscriber) {
+                 subscriber.onStart();
+ 
+                 final List<Entity> entities = results.getEntities();
+                 final int size = entities.size();
+                 for (int i = 0; i < size && !subscriber.isUnsubscribed(); 
i++) {
+                     subscriber.onNext(new EntityTuple(i, entities.get(i)));
                  }
-             });
 -
+                 subscriber.onCompleted();
+             }
+         });
  
-             //now process them in parallel up to 10 threads
+         //now process them in parallel up to 10 threads
  
-             tuples.flatMap(tuple -> {
-                 //map the entity into the tuple
-                 return Observable.just(tuple).doOnNext(parallelTuple -> {
-                     //import the entity and set it at index
-                     try {
+         Observable tuplesObservable = tuples.flatMap(tuple -> {
+             //map the entity into the tuple
+             return Observable.just(tuple).doOnNext(parallelTuple -> {
+                 //import the entity and set it at index
+                 try {
  
-                         final Entity imported = importEntity(request, 
parallelTuple.entity);
+                     final Entity imported = importEntity(request, 
parallelTuple.entity);
  
-                         if (imported != null) {
-                             results.setEntity(parallelTuple.index, imported);
-                         }
-                     } catch (Exception e) {
-                         throw new RuntimeException(e);
+                     if (imported != null) {
+                         results.setEntity(parallelTuple.index, imported);
                      }
-                 }).subscribeOn(rxScheduler);
-             }, 
rxSchedulerFig.getImportThreads()).toBlocking().lastOrDefault(null);
-         } finally {
-             timer.stop();
-         }
+                 } catch (Exception e) {
+                     throw new RuntimeException(e);
+                 }
+             }).subscribeOn(rxScheduler);
+         }, rxSchedulerFig.getImportThreads());
+ 
+         ObservableTimer.time(tuplesObservable, 
entitiesParallelGetTimer).toBlocking().lastOrDefault(null);
      }
  
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/EntityInsertBenchMark.java
----------------------------------------------------------------------
diff --cc 
stack/tools/src/main/java/org/apache/usergrid/tools/EntityInsertBenchMark.java
index fe1edbf,fe1edbf..0000000
deleted file mode 100644,100644
--- 
a/stack/tools/src/main/java/org/apache/usergrid/tools/EntityInsertBenchMark.java
+++ /dev/null
@@@ -1,272 -1,272 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *      http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.usergrid.tools;
--
--
--import java.nio.ByteBuffer;
--import java.util.Stack;
--import java.util.UUID;
--import java.util.concurrent.Callable;
--import java.util.concurrent.ExecutorService;
--import java.util.concurrent.Executors;
--import java.util.concurrent.Future;
--
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--import org.apache.usergrid.persistence.DynamicEntity;
--import org.apache.usergrid.persistence.IndexBucketLocator;
--import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
--import org.apache.usergrid.persistence.cassandra.EntityManagerImpl;
--import org.apache.usergrid.utils.UUIDUtils;
--
--import org.apache.commons.cli.CommandLine;
--import org.apache.commons.cli.Option;
--import org.apache.commons.cli.OptionBuilder;
--import org.apache.commons.cli.Options;
--
--import me.prettyprint.hector.api.Keyspace;
--import me.prettyprint.hector.api.beans.DynamicComposite;
--import me.prettyprint.hector.api.mutation.Mutator;
--
--import static me.prettyprint.hector.api.factory.HFactory.createMutator;
--import static 
org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
--import static 
org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_UNIQUE;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
--import static 
org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
--import static org.apache.usergrid.persistence.cassandra.Serializers.*;
--
--
--/**
-- * A utility to insert entities into the em for benchmarking
-- *
-- * @author tnine
-- */
--public class EntityInsertBenchMark extends ToolBase {
--
--
--    private static final Logger logger = LoggerFactory.getLogger( 
EntityInsertBenchMark.class );
--
--
--    @Override
--    @SuppressWarnings("static-access")
--    public Options createOptions() {
--
--        Option hostOption =
--                OptionBuilder.withArgName( "host" ).hasArg().isRequired( true 
).withDescription( "Cassandra host" )
--                             .create( "host" );
--
--        Option countOption =
--                OptionBuilder.withArgName( "count" ).hasArg().isRequired( 
true ).withDescription( "Number of records" )
--                             .create( "count" );
--
--        Option appIdOption = OptionBuilder.withArgName( "appId" 
).hasArg().isRequired( true )
--                                          .withDescription( "Application Id 
to use" ).create( "appId" );
--
--        Option workerOption = OptionBuilder.withArgName( "workers" 
).hasArg().isRequired( true )
--                                           .withDescription( "Number of 
workers to use" ).create( "workers" );
--
--
--        Options options = new Options();
--        options.addOption( hostOption );
--        options.addOption( countOption );
--        options.addOption( appIdOption );
--        options.addOption( workerOption );
--
--        return options;
--    }
--
--
--    /*
--     * (non-Javadoc)
--     * 
--     * @see
--     * 
org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
--     */
--    @Override
--    public void runTool( CommandLine line ) throws Exception {
--        startSpring();
--
--        logger.info( "Starting entity cleanup" );
--
--        int workerSize = Integer.parseInt( line.getOptionValue( "workers" ) );
--
--
--        ExecutorService executors = Executors.newFixedThreadPool( workerSize 
);
--
--        int count = Integer.parseInt( line.getOptionValue( "count" ) );
--
--        int size = count / workerSize;
--
--        UUID appId = UUID.fromString( line.getOptionValue( "appId" ) );
--
--        Stack<Future<Void>> futures = new Stack<Future<Void>>();
--
--        for ( int i = 0; i < workerSize; i++ ) {
--            futures.push( executors.submit( new InsertWorker( i, size, appId 
) ) );
--        }
--
--        System.out.println( "Waiting for workers to complete insertion" );
--
--        /**
--         * Wait for all tasks to complete
--         */
--        while ( !futures.isEmpty() ) {
--            futures.pop().get();
--        }
--
--        System.out.println( "All workers completed insertion" );
--    }
--
--
--    private class InsertWorker implements Callable<Void> {
--
--        private int count;
--
--        private int workerNumber;
--
--        private UUID appId;
--
--
--        private InsertWorker( int workerNumber, int count, UUID appId ) {
--            this.workerNumber = workerNumber;
--            this.count = count;
--            this.appId = appId;
--        }
--
--
--        /*
--         * (non-Javadoc)
--         * 
--         * @see java.util.concurrent.Callable#call()
--         */
--        @Override
--        public Void call() throws Exception {
--
--            Keyspace ko = 
EntityInsertBenchMark.this.cass.getApplicationKeyspace( appId );
--            EntityManagerImpl em = ( EntityManagerImpl ) 
emf.getEntityManager( appId );
--            IndexBucketLocator indexBucketLocator = 
em.getIndexBucketLocator();
--
--            for ( int i = 0; i < count; i++ ) {
--
--                Mutator<ByteBuffer> m = createMutator( ko, be );
--
--                DynamicEntity dynEntity = new DynamicEntity();
--                dynEntity.setType( "test" );
--                dynEntity.setUuid( UUIDUtils.newTimeUUID() );
--
--                String value = new StringBuilder().append( workerNumber 
).append( "-" ).append( i ).toString();
--
--
--                String bucketId =
--                        indexBucketLocator.getBucket( appId, 
IndexType.COLLECTION, dynEntity.getUuid(), "test" );
--
--                Object index_name = key( appId, "tests", "test", bucketId );
--
--                IndexEntry entry = new IndexEntry( dynEntity.getUuid(), 
"test", value, UUIDUtils.newTimeUUID() );
--
--                addInsertToMutator( m, ENTITY_INDEX, index_name, 
entry.getIndexComposite(), null,
--                        System.currentTimeMillis() );
--
--                UniqueIndexer indexer = new UniqueIndexer( m );
--                indexer.writeIndex( appId, "tests", dynEntity.getUuid(), 
"test", value );
--                // write this to the direct collection index
--
--                m.execute();
--
--                if ( i % 100 == 0 ) {
--                    System.out.println(
--                            String.format( "%s : Written %d of %d", 
Thread.currentThread().getName(), i, count ) );
--                }
--            }
--
--            return null;
--        }
--    }
--
--
--    private class UniqueIndexer {
--
--        private Mutator<ByteBuffer> mutator;
--
--
--        /**
--         * @param mutator
--         */
--        public UniqueIndexer( Mutator<ByteBuffer> mutator ) {
--            super();
--            this.mutator = mutator;
--        }
--
--
--        private void writeIndex( UUID applicationId, String collectionName, 
UUID entityId, String propName,
--                                 Object entityValue ) {
--
--            Object rowKey = key( applicationId, collectionName, propName, 
entityValue );
--
--            addInsertToMutator( mutator, ENTITY_UNIQUE, rowKey, entityId, 
null, System.currentTimeMillis() );
--        }
--    }
--
--
--    public static class IndexEntry {
--        private final byte code;
--        private String path;
--        private final Object value;
--        private final UUID timestampUuid;
--        private final UUID entityId;
--
--
--        public IndexEntry( UUID entityId, String path, Object value, UUID 
timestampUuid ) {
--            this.entityId = entityId;
--            this.path = path;
--            this.value = value;
--            code = indexValueCode( value );
--            this.timestampUuid = timestampUuid;
--        }
--
--
--        public String getPath() {
--            return path;
--        }
--
--
--        public void setPath( String path ) {
--            this.path = path;
--        }
--
--
--        public Object getValue() {
--            return value;
--        }
--
--
--        public byte getValueCode() {
--            return code;
--        }
--
--
--        public UUID getTimestampUuid() {
--            return timestampUuid;
--        }
--
--
--        public DynamicComposite getIndexComposite() {
--            return new DynamicComposite( code, value, entityId, timestampUuid 
);
--        }
--    }
--}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/EntityReadBenchMark.java
----------------------------------------------------------------------
diff --cc 
stack/tools/src/main/java/org/apache/usergrid/tools/EntityReadBenchMark.java
index 1b54495,1b54495..0000000
deleted file mode 100644,100644
--- 
a/stack/tools/src/main/java/org/apache/usergrid/tools/EntityReadBenchMark.java
+++ /dev/null
@@@ -1,353 -1,353 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *      http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.usergrid.tools;
--
--
--import java.nio.ByteBuffer;
--import java.util.ArrayList;
--import java.util.List;
--import java.util.Stack;
--import java.util.UUID;
--import java.util.concurrent.Callable;
--import java.util.concurrent.ExecutorService;
--import java.util.concurrent.Executors;
--import java.util.concurrent.Future;
--import java.util.concurrent.TimeUnit;
--
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--import org.springframework.util.Assert;
--import org.apache.usergrid.persistence.IndexBucketLocator;
--import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
--import org.apache.usergrid.persistence.cassandra.EntityManagerImpl;
--
--import org.apache.commons.cli.CommandLine;
--import org.apache.commons.cli.Option;
--import org.apache.commons.cli.OptionBuilder;
--import org.apache.commons.cli.Options;
--
--import com.yammer.metrics.Metrics;
--import com.yammer.metrics.core.MetricPredicate;
--import com.yammer.metrics.core.Timer;
--import com.yammer.metrics.core.TimerContext;
--import com.yammer.metrics.reporting.ConsoleReporter;
--
--import me.prettyprint.hector.api.Keyspace;
--import me.prettyprint.hector.api.beans.AbstractComposite.ComponentEquality;
--import me.prettyprint.hector.api.beans.DynamicComposite;
--import me.prettyprint.hector.api.beans.HColumn;
--import me.prettyprint.hector.api.beans.Row;
--import me.prettyprint.hector.api.beans.Rows;
--import me.prettyprint.hector.api.factory.HFactory;
--import me.prettyprint.hector.api.query.MultigetSliceQuery;
--import me.prettyprint.hector.api.query.QueryResult;
--
--import static 
org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
--import static 
org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_UNIQUE;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
--import static 
org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
--import static org.apache.usergrid.utils.ConversionUtils.bytebuffers;
--import static org.apache.usergrid.persistence.cassandra.Serializers.*;
--
--
--/**
-- * A utility to insert entities into the em for benchmarking
-- *
-- * @author tnine
-- */
--public class EntityReadBenchMark extends ToolBase {
--
--
--
--    private static final Logger logger = LoggerFactory.getLogger( 
EntityReadBenchMark.class );
--
--    private final Timer queryReads =
--            Metrics.newTimer( ReadWorker.class, "entity", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS );
--
--    private final Timer dictReads =
--            Metrics.newTimer( ReadWorker.class, "dictionary", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS );
--
--    private static final String TYPE_DICTIONARY = "dict";
--    private static final String TYPE_ENTITY = "entity";
--
--
--    @Override
--    @SuppressWarnings("static-access")
--    public Options createOptions() {
--
--        Option hostOption =
--                OptionBuilder.withArgName( "host" ).hasArg().isRequired( true 
).withDescription( "Cassandra host" )
--                             .create( "host" );
--
--        Option countOption =
--                OptionBuilder.withArgName( "count" ).hasArg().isRequired( 
true ).withDescription( "Number of records" )
--                             .create( "count" );
--
--        Option appIdOption = OptionBuilder.withArgName( "appId" 
).hasArg().isRequired( true )
--                                          .withDescription( "Application Id 
to use" ).create( "appId" );
--
--        Option workerOption = OptionBuilder.withArgName( "workers" 
).hasArg().isRequired( true )
--                                           .withDescription( "Number of 
workers to use" ).create( "workers" );
--
--
--        Option typeOption = OptionBuilder.withArgName( "type" 
).hasArg().isRequired( true )
--                                         .withDescription( "Read type to use, 
'dict' or 'entity'" ).create( "type" );
--
--        Options options = new Options();
--        options.addOption( hostOption );
--        options.addOption( countOption );
--        options.addOption( appIdOption );
--        options.addOption( workerOption );
--        options.addOption( typeOption );
--
--        return options;
--    }
--
--
--    /*
--     * (non-Javadoc)
--     * 
--     * @see
--     * 
org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
--     */
--    @Override
--    public void runTool( CommandLine line ) throws Exception {
--        startSpring();
--
--        logger.info( "Starting entity cleanup" );
--
--        int workerSize = Integer.parseInt( line.getOptionValue( "workers" ) );
--
--        ExecutorService executors = Executors.newFixedThreadPool( workerSize 
);
--
--        int count = Integer.parseInt( line.getOptionValue( "count" ) );
--
--        int size = count / workerSize;
--
--        UUID appId = UUID.fromString( line.getOptionValue( "appId" ) );
--
--        System.out.println( "Querying unique properties in the search index" 
);
--
--
--        final ConsoleReporter reporter =
--                new ConsoleReporter( Metrics.defaultRegistry(), System.out, 
MetricPredicate.ALL );
--
--        //print every 30 seconds
--        reporter.start( 30, TimeUnit.SECONDS );
--
--        Stack<Future<Void>> futures = new Stack<Future<Void>>();
--
--
--        String type = line.getOptionValue( "type" );
--
--        for ( int i = 0; i < workerSize; i++ ) {
--
--            ReadWorker worker = null;
--
--
--            if ( TYPE_ENTITY.equals( type ) ) {
--                worker = new IndexReadWorker( i, size, appId );
--            }
--            else if ( TYPE_DICTIONARY.equals( type ) ) {
--                worker = new DictReadWorker( i, size, appId );
--            }
--            else {
--                throw new IllegalArgumentException( "You must specifiy the 
'type' option" );
--            }
--
--            futures.push( executors.submit( worker ) );
--        }
--
--
--        System.out.println( "Waiting for index read workers to complete" );
--
--        /**
--         * Wait for all tasks to complete
--         */
--        while ( !futures.isEmpty() ) {
--            futures.pop().get();
--        }
--
--
--        System.out.println( "All workers completed reading" );
--
--
--        //print the report
--        reporter.run();
--    }
--
--
--    private abstract class ReadWorker implements Callable<Void> {
--
--        protected int count;
--
--        protected int workerNumber;
--
--        protected UUID appId;
--
--
--        private ReadWorker( int workerNumber, int count, UUID appId ) throws 
Exception {
--            this.workerNumber = workerNumber;
--            this.count = count;
--            this.appId = appId;
--        }
--
--
--        /*
--         * (non-Javadoc)
--         * 
--         * @see java.util.concurrent.Callable#call()
--         */
--        @Override
--        public Void call() throws Exception {
--
--
--            for ( int i = 0; i < count; i++ ) {
--
--                String value = new StringBuilder().append( workerNumber 
).append( "-" ).append( i ).toString();
--
--
--                doRead( value );
--            }
--
--            return null;
--        }
--
--
--        protected abstract void doRead( String value ) throws Exception;
--    }
--
--
--    private class IndexReadWorker extends ReadWorker {
--
--        private Keyspace keyspace;
--        private IndexBucketLocator indexBucketLocator = null;
--
--
--        private IndexReadWorker( int workerNumber, int count, UUID appId ) 
throws Exception {
--            super( workerNumber, count, appId );
--            keyspace = EntityReadBenchMark.this.cass.getApplicationKeyspace( 
appId );
--            indexBucketLocator = ( ( EntityManagerImpl ) 
EntityReadBenchMark.this.emf.getEntityManager( appId ) )
--                    .getIndexBucketLocator();
--        }
--
--
--        /* (non-Javadoc)
--         * @see 
org.apache.usergrid.tools.EntityReadBenchMark.ReadWorker#doRead()
--         */
--        @Override
--        protected void doRead( String value ) throws Exception {
--            TimerContext timer = queryReads.time();
--
--            Assert.isTrue( read( value ) );
--
--            timer.stop();
--        }
--
--
--        private boolean read( String value ) {
--
--
--            List<String> buckets = indexBucketLocator.getBuckets( appId, 
IndexType.UNIQUE, "tests" );
--
--            List<Object> cassKeys = new ArrayList<Object>( buckets.size() );
--
--            Object keyPrefix = key( appId, "tests", "test" );
--
--            for ( String bucket : buckets ) {
--                cassKeys.add( key( keyPrefix, bucket ) );
--            }
--
--            MultigetSliceQuery<ByteBuffer, DynamicComposite, ByteBuffer> 
multiget =
--                    HFactory.createMultigetSliceQuery( keyspace, be, dce,
--                            be );
--
--            multiget.setColumnFamily( ENTITY_INDEX.getColumnFamily() );
--            multiget.setKeys( bytebuffers( cassKeys ) );
--
--
--            DynamicComposite start = new DynamicComposite( indexValueCode( 
value ), value );
--
--            DynamicComposite finish = new DynamicComposite( indexValueCode( 
value ) );
--            finish.addComponent( 1, value, 
ComponentEquality.GREATER_THAN_EQUAL );
--
--
--            multiget.setRange( start, finish, false, 1 );
--            QueryResult<Rows<ByteBuffer, DynamicComposite, ByteBuffer>> 
results = multiget.execute();
--
--            // search for a column, if one exists, we've found the entity
--            for ( Row<ByteBuffer, DynamicComposite, ByteBuffer> row : 
results.get() ) {
--                if ( row.getColumnSlice().getColumns().size() > 0 ) {
--                    return true;
--                }
--            }
--
--            return false;
--        }
--    }
--
--
--    private class DictReadWorker extends ReadWorker {
--
--
--        UniqueIndexer indexer;
--
--
--        private DictReadWorker( int workerNumber, int count, UUID appId ) 
throws Exception {
--            super( workerNumber, count, appId );
--            Keyspace ko = 
EntityReadBenchMark.this.cass.getApplicationKeyspace( appId );
--            indexer = new UniqueIndexer( ko );
--        }
--
--
--        /* (non-Javadoc)
--         * @see 
org.apache.usergrid.tools.EntityReadBenchMark.ReadWorker#doRead()
--         */
--        @Override
--        protected void doRead( String value ) throws Exception {
--
--            TimerContext timer = dictReads.time();
--
--            Assert.isTrue( indexer.existsInIndex( appId, "tests", "test", 
value ) );
--
--            timer.stop();
--        }
--    }
--
--
--    private class UniqueIndexer {
--
--        private Keyspace keyspace;
--        public UniqueIndexer( Keyspace keyspace ) {
--            super();
--            this.keyspace = keyspace;
--        }
--
--
--        private boolean existsInIndex( UUID applicationId, String 
collectionName, String propName, Object entityValue )
--                throws Exception {
--            Object rowKey = key( applicationId, collectionName, propName, 
entityValue );
--
--
--            List<HColumn<ByteBuffer, ByteBuffer>> cols =
--                    cass.getColumns( keyspace, ENTITY_UNIQUE, rowKey, null, 
null, 2, false );
--
--
--            return cols.size() > 0;
--        }
--    }
--}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
index eb616a8,eb616a8..bc83921
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
@@@ -26,6 -26,6 +26,7 @@@ import java.util.Map.Entry
  import java.util.Set;
  import java.util.UUID;
  
++import org.apache.usergrid.persistence.*;
  import org.codehaus.jackson.JsonEncoding;
  import org.codehaus.jackson.JsonFactory;
  import org.codehaus.jackson.JsonGenerator;
@@@ -33,19 -33,19 +34,12 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  import org.apache.usergrid.management.OrganizationInfo;
  import org.apache.usergrid.management.UserInfo;
--import org.apache.usergrid.persistence.ConnectionRef;
--import org.apache.usergrid.persistence.Entity;
--import org.apache.usergrid.persistence.EntityManager;
--import org.apache.usergrid.persistence.index.query.Query;
--import org.apache.usergrid.persistence.Results;
  import org.apache.usergrid.tools.bean.ExportOrg;
  import org.apache.usergrid.utils.JsonUtils;
  
  import org.apache.commons.cli.CommandLine;
  
  import com.google.common.collect.BiMap;
--import org.apache.usergrid.persistence.SimpleEntityRef;
--import org.apache.usergrid.persistence.index.query.Query.Level;
  
  
  public class Export extends ExportingToolBase {
@@@ -175,7 -175,7 +169,7 @@@
  
                  Query query = new Query();
                  query.setLimit( MAX_ENTITY_FETCH );
--                query.setResultsLevel( Level.ALL_PROPERTIES );
++                query.setResultsLevel( Query.Level.ALL_PROPERTIES );
  
                  Results entities = em.searchCollection( 
em.getApplicationRef(), collectionName, query );
  
@@@ -239,7 -239,7 +233,7 @@@
              // Start collection array.
              jg.writeStartArray();
  
--            Results collectionMembers = em.getCollection( entity, 
collectionName, null, 100000, Level.IDS, false );
++            Results collectionMembers = em.getCollection( entity, 
collectionName, null, 100000, Query.Level.IDS, false );
  
              List<UUID> entityIds = collectionMembers.getIds();
  
@@@ -307,13 -307,13 +301,13 @@@
              jg.writeFieldName( connectionType );
              jg.writeStartArray();
  
--            Results results = em.getConnectedEntities( 
--                    entity, connectionType, null, Level.IDS );
++            Results results = em.getTargetEntities(
++                    entity, connectionType, null, Query.Level.IDS );
  
              List<ConnectionRef> connections = results.getConnections();
  
              for ( ConnectionRef connectionRef : connections ) {
--                jg.writeObject( connectionRef.getConnectedEntity().getUuid() 
);
++                jg.writeObject( connectionRef.getTargetRefs().getUuid() );
              }
  
              jg.writeEndArray();
@@@ -330,8 -330,8 +324,8 @@@
     *    write entity_id : { "collectionName" : [ids]
     *  }
     * }
--   * 
--   * 
++   *
++   *
     *   {
     *     entity_id :
     *       { collection_name :
@@@ -348,9 -348,9 +342,9 @@@
     *         ]
     *       }
     *   }
--   * 
++   *
     * 
http://jackson.codehaus.org/1.8.0/javadoc/org/codehaus/jackson/JsonGenerator.html
--   * 
++   *
     *
     *-
     * List<ConnectedEntityRef> connections = em.getConnections(entityId, 
query);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/Import.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/Import.java
index 01313e3,01313e3..0861a0a
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/Import.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Import.java
@@@ -25,6 -25,6 +25,7 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.UUID;
  
++import com.google.common.base.Optional;
  import org.codehaus.jackson.JsonFactory;
  import org.codehaus.jackson.JsonParser;
  import org.codehaus.jackson.JsonToken;
@@@ -366,10 -366,10 +367,10 @@@ public class Import extends ToolBase 
          // Retrieve the namepsace for this collection. It's part of the name
          String applicationName = getApplicationFromColllection( 
collectionFileName );
  
--        UUID appId = emf.lookupApplication( applicationName );
++        Optional<UUID> appId = emf.lookupApplication( applicationName );
  
          //no org in path, this is a pre public beta so we need to create the 
new path
--        if ( appId == null && !applicationName.contains( "/" ) ) {
++        if ( !appId.isPresent() && !applicationName.contains( "/" ) ) {
              String fileName = collectionFileName.replace( "collections", 
"application" );
  
              File applicationFile = new File( importDir, fileName );
@@@ -413,8 -413,8 +414,8 @@@
          }
  
  
--        if ( appId == null ) {
--            logger.error( "Unable to find application with name {}.  Skipping 
collections", appId );
++        if ( !appId.isPresent() ) {
++            logger.error( "Unable to find application with name {}.  Skipping 
collections", applicationName );
              return;
          }
  
@@@ -427,7 -427,7 +428,7 @@@
          jp.nextToken(); // START_OBJECT this is the outter hashmap
  
  
--        EntityManager em = emf.getEntityManager( appId );
++        EntityManager em = emf.getEntityManager( appId.get() );
  
          while ( jp.nextToken() != JsonToken.END_OBJECT ) {
              importEntitysStuff( jp, em );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
index 5c35e85,5c35e85..0000000
deleted file mode 100644,100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
+++ /dev/null
@@@ -1,184 -1,184 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *      http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.usergrid.tools;
--
--
--import java.util.Collection;
--import java.util.Collections;
--import java.util.Map;
--import java.util.Map.Entry;
--import java.util.Set;
--import java.util.UUID;
--
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--import org.apache.usergrid.persistence.EntityManager;
--import org.apache.usergrid.utils.UUIDUtils;
--
--import org.apache.commons.cli.CommandLine;
--import org.apache.commons.cli.Option;
--import org.apache.commons.cli.OptionBuilder;
--import org.apache.commons.cli.Options;
--import org.apache.usergrid.persistence.EntityManagerFactory;
--import org.apache.usergrid.persistence.EntityRef;
--
--
--/**
-- * Index rebuild utility for Usergrid. Can be used to rebuild the index for a 
specific 
-- * application, a specific application's collection or for an entire Usergrid 
system.
-- */
--public class IndexRebuild extends ToolBase {
--
--    private static final String APPLICATION_ARG = "app";
--
--    private static final String COLLECTION_ARG = "col";
--
--    private static final String ALL_ARG = "all";
--
--    private static final int PAGE_SIZE = 100;
--
--
--    private static final Logger logger = LoggerFactory.getLogger( 
IndexRebuild.class );
--
--
--    @Override
--    @SuppressWarnings("static-access")
--    public Options createOptions() {
--
--        Option hostOpt = OptionBuilder.withArgName( "host" 
).hasArg().isRequired( true )
--                .withDescription( "Cassandra host" ).create( "host" );
--
--        Option esHostsOpt = OptionBuilder.withArgName( "host" 
).hasArg().isRequired( true )
--                .withDescription( "ElasticSearch host" ).create( "eshost" );
--
--        Option esClusterOpt = OptionBuilder.withArgName( "host" 
).hasArg().isRequired( true )
--                .withDescription( "ElasticSearch cluster name" ).create( 
"escluster" );
--
--        Option appOpt = OptionBuilder.withArgName( APPLICATION_ARG 
).hasArg().isRequired( false )
--                .withDescription( "Application id or app name" ).create( 
APPLICATION_ARG );
--
--        Option collOpt = OptionBuilder.withArgName( COLLECTION_ARG 
).hasArg().isRequired( false )
--                .withDescription( "Collection name" ).create( COLLECTION_ARG 
);
--
--        Option allOpt = OptionBuilder.withType( Boolean.class )
--                .withArgName( ALL_ARG ).hasArg().isRequired( false )
--                .withDescription( "True to reindex all application" ).create( 
ALL_ARG );
--
--        Options options = new Options();
--        options.addOption( hostOpt );
--        options.addOption( esHostsOpt );
--        options.addOption( esClusterOpt );
--        options.addOption( appOpt );
--        options.addOption( collOpt );
--        options.addOption( allOpt );
--
--        return options;
--    }
--
--
--    /*
--     * (non-Javadoc)
--     * 
--     * @see
--     * 
org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
--     */
--    @Override
--    public void runTool( CommandLine line ) throws Exception {
--        startSpring();
--
--        logger.info( "Starting index rebuild" );
--
--        EntityManagerFactory.ProgressObserver po = new 
EntityManagerFactory.ProgressObserver() {
--            
--            @Override
--            public void onProgress(EntityRef entity) {
--                logger.info("Indexing entity {}:{}", entity.getType(), 
entity.getUuid());
--            }
--
--            @Override
--            public long getWriteDelayTime() {
--                return 100;
--            }
--        };
--
--        emf.rebuildInternalIndexes( po ); 
--        emf.refreshIndex();
--
--        if ( line.getOptionValue("all") != null && 
line.getOptionValue("all").equalsIgnoreCase("true") ) {
--            emf.rebuildAllIndexes( po );
--
--        } else if ( line.getOptionValue( APPLICATION_ARG ) != null ) {
--
--            // Goes through each app id specified
--            for ( UUID appId : getAppIds( line ) ) {
--
--                logger.info( "Reindexing for app id: {}", appId );
--                Set<String> collections = getCollections( line, appId );
--
--                for ( String collection : collections ) {
--                    emf.rebuildCollectionIndex( appId, collection, po );
--                    emf.refreshIndex();
--                }
--            }
--
--        } else {
--
--            Map<String, UUID> ids = emf.getApplications();
--            System.out.println( "Printing all apps" );
--            for ( Entry<String, UUID> entry : ids.entrySet() ) {
--                System.out.println( entry.getKey() + " appid=" + 
entry.getValue() );
--            }
--        }
--
--        logger.info( "Finished index rebuild" );
--    }
--
--
--    /** Get all app id */
--    private Collection<UUID> getAppIds( CommandLine line ) throws Exception {
--
--        String appId = line.getOptionValue( APPLICATION_ARG );
--
--        Map<String, UUID> ids = emf.getApplications();
--
--        if ( appId != null ) {
--            UUID id = UUIDUtils.tryExtractUUID( appId );
--            if ( id == null ) {
--                logger.debug("Got applications: " + ids );
--                id = emf.getApplications().get( appId );
--            }
--            return Collections.singleton( id );
--        }
--
--        return ids.values();
--    }
--
--
--    /** Get collection names. If none are specified, all are returned */
--    private Set<String> getCollections( CommandLine line, UUID appId ) throws 
Exception {
--
--        String passedName = line.getOptionValue( COLLECTION_ARG );
--
--        if ( passedName != null ) {
--            return Collections.singleton( passedName );
--        }
--
--        EntityManager em = emf.getEntityManager( appId );
--
--        return em.getApplicationCollections();
--    }
--}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java
----------------------------------------------------------------------
diff --cc 
stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java
index 7b18ccc,7b18ccc..0000000
deleted file mode 100644,100644
--- 
a/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java
+++ /dev/null
@@@ -1,119 -1,119 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *      http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.usergrid.tools;
--
--
--import java.nio.ByteBuffer;
--import java.util.List;
--import java.util.Map;
--import java.util.UUID;
--
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--import org.apache.commons.cli.CommandLine;
--import org.apache.commons.cli.Options;
--import org.apache.log4j.Level;
--import org.apache.log4j.LogManager;
--
--import org.apache.usergrid.management.OrganizationInfo;
--
--import com.google.common.collect.BiMap;
--
--import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
--import me.prettyprint.hector.api.Keyspace;
--import me.prettyprint.hector.api.mutation.Mutator;
--
--import static me.prettyprint.hector.api.factory.HFactory.createMutator;
--import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
--import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraService.APPLICATIONS_CF;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraService.RETRY_COUNT;
--
--
--public class RepairingMismatchedApplicationMetadata extends ToolBase {
--
--    public static final ByteBufferSerializer be = new ByteBufferSerializer();
--
--    private static final Logger logger = LoggerFactory.getLogger( 
RepairingMismatchedApplicationMetadata.class );
--
--
--    @Override
--    public Options createOptions() {
--        Options options = super.createOptions();
--        return options;
--    }
--
--
--    @Override
--    public void runTool( CommandLine line ) throws Exception {
--        startSpring();
--
--        //sucks, but it's not picking up the configuration
--        LogManager.getLogger( RepairingMismatchedApplicationMetadata.class 
).setLevel( Level.INFO );
--
--        UUID orgId = null;
--        List<OrganizationInfo> orgs;
--
--        final int size = 1000;
--
--
--        do {
--            orgs = managementService.getOrganizations( orgId, size );
--
--
--            for ( OrganizationInfo org : orgs ) {
--
--                orgId = org.getUuid();
--
--                logger.info( "Auditing org {}", org.getName() );
--
--                try {
--                    BiMap<UUID, String> apps = 
managementService.getApplicationsForOrganization( org.getUuid() );
--
--
--                    for ( Map.Entry<UUID, String> app : apps.entrySet() ) {
--
--                        logger.info( "Auditing org {} app {}", org.getName(), 
app.getValue() );
--
--                        UUID applicationId = emf.lookupApplication( 
app.getValue() );
--                        if ( applicationId == null ) {
--                            String appName = app.getValue();
--                            Keyspace ko = cass.getSystemKeyspace();
--                            Mutator<ByteBuffer> m = createMutator( ko, be );
--                            long timestamp = cass.createTimestamp();
--                            addInsertToMutator( m, APPLICATIONS_CF, appName, 
PROPERTY_UUID, app.getKey(), timestamp );
--                            addInsertToMutator( m, APPLICATIONS_CF, appName, 
PROPERTY_NAME, appName, timestamp );
--                            batchExecute( m, RETRY_COUNT );
--                            logger.info( "Repairing alias with app uuid {}, 
and name {}", app.getKey(),
--                                    app.getValue() );
--                        }
--                    }
--                }
--                catch ( Exception e ) {
--                    logger.error( "Unable to process applications for 
organization {}", org, e );
--                }
--            }
--        }
--        while ( orgs != null && orgs.size() == size );
--
--        logger.info( "Completed repairing aliases" );
--        Thread.sleep( 1000 * 60 );
--    }
--}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
----------------------------------------------------------------------
diff --cc stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index aa33e49,aa33e49..a78143b
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@@ -181,7 -181,7 +181,7 @@@ public abstract class ToolBase 
  
          Setup setup = ( (CpEntityManagerFactory) emf ).getSetup();
          logger.info( "Setting up Usergrid schema" );
--        setup.init();
++        setup.initSubsystems();
          logger.info( "Usergrid schema setup" );
  
          logger.info( "Setting up Usergrid management services" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9edb31c5/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueIndexCleanup.java
----------------------------------------------------------------------
diff --cc 
stack/tools/src/main/java/org/apache/usergrid/tools/UniqueIndexCleanup.java
index 0ae45d6,0ae45d6..0000000
deleted file mode 100644,100644
--- 
a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueIndexCleanup.java
+++ /dev/null
@@@ -1,386 -1,386 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *      http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.usergrid.tools;
--
--
--import java.nio.ByteBuffer;
--import java.util.ArrayList;
--import java.util.HashMap;
--import java.util.HashSet;
--import java.util.List;
--import java.util.Map;
--import java.util.Map.Entry;
--import java.util.Set;
--import java.util.UUID;
--
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--import org.apache.usergrid.management.ApplicationInfo;
--import org.apache.usergrid.persistence.Entity;
--import org.apache.usergrid.persistence.EntityManager;
--import org.apache.usergrid.persistence.EntityManagerFactory;
--import org.apache.usergrid.persistence.index.query.Identifier;
--import org.apache.usergrid.persistence.IndexBucketLocator;
--import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
--import org.apache.usergrid.persistence.cassandra.CassandraService;
--import org.apache.usergrid.persistence.cassandra.EntityManagerImpl;
--import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
--import org.apache.usergrid.persistence.entities.Application;
--import org.apache.usergrid.persistence.query.ir.result.ScanColumn;
--import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
--import org.apache.usergrid.persistence.query.ir.result.UUIDIndexSliceParser;
--import org.apache.usergrid.persistence.schema.CollectionInfo;
--
--import org.apache.commons.cli.CommandLine;
--import org.apache.commons.cli.Option;
--import org.apache.commons.cli.OptionBuilder;
--import org.apache.commons.cli.Options;
--
--import me.prettyprint.hector.api.Keyspace;
--import me.prettyprint.hector.api.beans.AbstractComposite.ComponentEquality;
--import me.prettyprint.hector.api.beans.DynamicComposite;
--import me.prettyprint.hector.api.beans.HColumn;
--import me.prettyprint.hector.api.mutation.Mutator;
--
--import static me.prettyprint.hector.api.factory.HFactory.createMutator;
--import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
--import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
--import org.apache.usergrid.persistence.SimpleEntityRef;
--import static 
org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
--import static 
org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
--import static 
org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
--import static org.apache.usergrid.utils.CompositeUtils.setEqualityFlag;
--import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
--import static org.apache.usergrid.utils.UUIDUtils.newTimeUUID;
--import static org.apache.usergrid.persistence.cassandra.Serializers.*;
--
--
--/**
-- * This is a utility to audit all available entity ids in the secondary 
index. It then checks to see if any index value
-- * is not present in the Entity_Index_Entries. If it is not, the value from 
the index is removed, and a forced re-index
-- * is triggered
-- * <p/>
-- * USERGRID-323
-- * <p/>
-- * <p/>
-- * UniqueIndexCleanup -app [appid] -col [collectionname]
-- *
-- * @author tnine
-- */
--public class UniqueIndexCleanup extends ToolBase {
--
--    /**
--     *
--     */
--    private static final int PAGE_SIZE = 100;
--
--
--
--    private static final Logger logger = LoggerFactory.getLogger( 
UniqueIndexCleanup.class );
--
--    /**
--     *
--     */
--    private static final String APPLICATION_ARG = "app";
--
--    /**
--     *
--     */
--    private static final String COLLECTION_ARG = "col";
--
--
--    @Override
--    @SuppressWarnings("static-access")
--    public Options createOptions() {
--
--
--        Options options = new Options();
--
--        Option hostOption =
--                OptionBuilder.withArgName( "host" ).hasArg().isRequired( true 
).withDescription( "Cassandra host" )
--                             .create( "host" );
--
--
--        options.addOption( hostOption );
--
--
--        Option appOption = OptionBuilder.withArgName( APPLICATION_ARG 
).hasArg().isRequired( false )
--                                        .withDescription( "application id or 
app name" ).create( APPLICATION_ARG );
--
--
--        options.addOption( appOption );
--
--        Option collectionOption = OptionBuilder.withArgName( COLLECTION_ARG 
).hasArg().isRequired( false )
--                                               .withDescription( "colleciton 
name" ).create( COLLECTION_ARG );
--
--        options.addOption( collectionOption );
--
--        return options;
--    }
--
--
--    /*
--     * (non-Javadoc)
--     *
--     * @see
--     * 
org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
--     */
--    @Override
--    public void runTool( CommandLine line ) throws Exception {
--        startSpring();
--
--        logger.info( "Starting entity cleanup" );
--
--        Map<String, UUID> apps = getApplications( emf, line );
--
--
--        for ( Entry<String, UUID> app : apps.entrySet() ) {
--
--            logger.info( "Starting cleanup for app {}", app.getKey() );
--
--            UUID applicationId = app.getValue();
--            EntityManagerImpl em = ( EntityManagerImpl ) 
emf.getEntityManager( applicationId );
--
--            //sanity check for corrupt apps
--            Application appEntity = em.getApplication();
--
--            if ( appEntity == null ) {
--                logger.warn( "Application does not exist in data. {}", 
app.getKey() );
--                continue;
--            }
--
--            CassandraService cass = em.getCass();
--            IndexBucketLocator indexBucketLocator = 
em.getIndexBucketLocator();
--
--            Keyspace ko = cass.getApplicationKeyspace( applicationId );
--
--            UUID timestampUuid = newTimeUUID();
--            long timestamp = getTimestampInMicros( timestampUuid );
--
--
--            // go through each collection and audit the values
--            for ( String collectionName : getCollectionNames( em, line ) ) {
--
--
--                IndexScanner scanner = cass.getIdList( 
cass.getApplicationKeyspace( applicationId ),
--                        key( applicationId, DICTIONARY_COLLECTIONS, 
collectionName ), null, null, PAGE_SIZE, false,
--                        indexBucketLocator, applicationId, collectionName, 
false );
--
--                SliceIterator itr = new SliceIterator( null, scanner, new 
UUIDIndexSliceParser() );
--
--
--                while ( itr.hasNext() ) {
--
--                    Set<ScanColumn> ids = itr.next();
--
--                    CollectionInfo collection = 
getDefaultSchema().getCollection( "application", collectionName );
--
--
--                    //We shouldn't have to do this, but otherwise the cursor 
won't work
--                    Set<String> indexed = collection.getPropertiesIndexed();
--
--                    // what's left needs deleted, do so
--
--                    logger.info( "Auditing {} entities for collection {} in 
app {}", new Object[] {
--                            ids.size(), collectionName, app.getValue()
--                    } );
--
--                    for ( ScanColumn col : ids ) {
--                        final UUID id = col.getUUID();
--                        String type = 
getDefaultSchema().getCollectionType("application", collectionName);
--
--                        boolean reIndex = false;
--
--                        Mutator<ByteBuffer> m = createMutator( ko, be );
--
--                        try {
--
--                            for ( String prop : indexed ) {
--
--                                String bucket =
--                                        indexBucketLocator.getBucket( 
applicationId, IndexType.COLLECTION, id, prop );
--
--                                Object rowKey = key( applicationId, 
collection.getName(), prop, bucket );
--
--                                List<HColumn<ByteBuffer, ByteBuffer>> 
indexCols =
--                                        scanIndexForAllTypes( ko, 
indexBucketLocator, applicationId, rowKey, id, prop );
--
--                                // loop through the indexed values and verify 
them as present in
--                                // our entity_index_entries. If they aren't, 
we need to delete the
--                                // from the secondary index, and mark
--                                // this object for re-index via n update
--                                for ( HColumn<ByteBuffer, ByteBuffer> index : 
indexCols ) {
--
--                                    DynamicComposite secondaryIndexValue =
--                                            DynamicComposite.fromByteBuffer( 
index.getName().duplicate() );
--
--                                    Object code = secondaryIndexValue.get( 0 
);
--                                    Object propValue = 
secondaryIndexValue.get( 1 );
--                                    UUID timestampId = ( UUID ) 
secondaryIndexValue.get( 3 );
--
--                                    DynamicComposite existingEntryStart =
--                                            new DynamicComposite( prop, code, 
propValue, timestampId );
--                                    DynamicComposite existingEntryFinish =
--                                            new DynamicComposite( prop, code, 
propValue, timestampId );
--
--                                    setEqualityFlag( existingEntryFinish, 
ComponentEquality.GREATER_THAN_EQUAL );
--
--                                    // now search our EntityIndexEntry for 
previous values, see if
--                                    // they don't match this one
--
--                                    List<HColumn<ByteBuffer, ByteBuffer>> 
entries =
--                                            cass.getColumns( ko, 
ENTITY_INDEX_ENTRIES, id, existingEntryStart,
--                                                    existingEntryFinish, 
INDEX_ENTRY_LIST_COUNT, false );
--
--                                    // we wouldn't find this column in our 
entity_index_entries
--                                    // audit. Delete it, then mark this 
entity for update
--                                    if ( entries.size() == 0 ) {
--                                        logger.info(
--                                            "Could not find reference to 
value '{}' property '{}'"+
--                                            " on entity {} in collection {}. 
" + " Forcing reindex",
--                                            new Object[] { propValue, prop, 
id, collectionName } );
--
--                                        addDeleteToMutator(
--                                            m, ENTITY_INDEX, rowKey, 
index.getName().duplicate(), timestamp );
--
--                                        reIndex = true;
--                                    }
--
--                                    if ( entries.size() > 1 ) {
--                                        logger.info(
--                                            "Found more than 1 entity 
referencing unique index "
--                                          + "for property '{}' with value " + 
"'{}'",
--                                            prop, propValue );
--                                        reIndex = true;
--                                    }
--                                }
--                            }
--
--                            //force this entity to be updated
--                            if ( reIndex ) {
--                                Entity entity = em.get( new SimpleEntityRef( 
type, id ));
--
--                                //entity may not exist, but we should have 
deleted rows from the index
--                                if ( entity == null ) {
--                                    logger.warn( "Entity with id {} did not 
exist in app {}",
--                                            id, applicationId );
--                                    //now execute the cleanup. In this case 
the entity is gone,
--                                    // so we'll want to remove references from
--                                    // the secondary index
--                                    m.execute();
--                                    continue;
--                                }
--
--
--                                logger.info( "Reindex complete for entity 
with id '{} ", id );
--                                em.update( entity );
--
--                                //now execute the cleanup. This way if the 
above update fails,
--                                // we still have enough data to run again
--                                // later
--                                m.execute();
--                            }
--                        }
--                        catch ( Exception e ) {
--                            logger.error( "Unable to process entity with id 
'{}'", id, e );
--                        }
--                    }
--                }
--            }
--        }
--
--        logger.info( "Completed audit of apps" );
--    }
--
--
--    private Map<String, UUID> getApplications( EntityManagerFactory emf, 
CommandLine line ) throws Exception {
--        String appName = line.getOptionValue( APPLICATION_ARG );
--
--        if ( appName == null ) {
--            return emf.getApplications();
--        }
--
--        ApplicationInfo app = managementService.getApplicationInfo( 
Identifier.from( appName ) );
--
--        if ( app == null ) {
--            logger.error( "Could not find application with id or name {}", 
appName );
--            System.exit( 3 );
--        }
--
--
--        Map<String, UUID> apps = new HashMap<String, UUID>();
--
--        apps.put( app.getName(), app.getId() );
--
--        return apps;
--    }
--
--
--    private Set<String> getCollectionNames( EntityManager em, CommandLine 
line ) throws Exception {
--
--        String collectionName = line.getOptionValue( COLLECTION_ARG );
--
--        if ( collectionName == null ) {
--            return em.getApplicationCollections();
--        }
--
--
--        Set<String> names = new HashSet<String>();
--        names.add( collectionName );
--
--        return names;
--    }
--
--
--    private List<HColumn<ByteBuffer, ByteBuffer>> scanIndexForAllTypes( 
Keyspace ko,
--                                                                        
IndexBucketLocator indexBucketLocator,
--                                                                        UUID 
applicationId, Object rowKey,
--                                                                        UUID 
entityId, String prop ) throws Exception {
--
--        //TODO Determine the index bucket.  Scan the entire index for 
properties with this entityId.
--
--
--        DynamicComposite start = null;
--
--        List<HColumn<ByteBuffer, ByteBuffer>> cols;
--
--        List<HColumn<ByteBuffer, ByteBuffer>> results = new 
ArrayList<HColumn<ByteBuffer, ByteBuffer>>();
--
--
--        do {
--            cols = cass.getColumns( ko, ENTITY_INDEX, rowKey, start, null, 
100, false );
--
--            for ( HColumn<ByteBuffer, ByteBuffer> col : cols ) {
--                DynamicComposite secondaryIndexValue = 
DynamicComposite.fromByteBuffer( col.getName().duplicate() );
--
--                UUID storedId = ( UUID ) secondaryIndexValue.get( 2 );
--
--                //add it to the set.  We can't short circuit due to property 
ordering
--                if ( entityId.equals( storedId ) ) {
--                    results.add( col );
--                }
--
--                start = secondaryIndexValue;
--            }
--        }
--        while ( cols.size() == 100 );
--
--        return results;
--    }
--}

Reply via email to