Updated tests to also contain larger body for measuring performance with more 
accurate entity size


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bab5ba7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bab5ba7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bab5ba7f

Branch: refs/heads/USERGRID-347
Commit: bab5ba7f018c18aab2ee09b98eeb74c6668a893a
Parents: c864ab6
Author: Todd Nine <[email protected]>
Authored: Fri Mar 20 15:23:29 2015 -0600
Committer: Todd Nine <[email protected]>
Committed: Fri Mar 20 15:23:29 2015 -0600

----------------------------------------------------------------------
 stack/corepersistence/queryindex/pom.xml        |  57 ++--
 .../persistence/index/EntityIndexBatch.java     |   5 +
 .../index/impl/EsEntityIndexBatchImpl.java      |  32 +-
 .../usergrid/persistence/index/query/Query.java |   6 +-
 .../persistence/index/guice/IndexTestFig.java   |   7 +-
 .../index/impl/IndexLoadTestsIT.java            | 317 ++++++++++++++++---
 6 files changed, 329 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml 
b/stack/corepersistence/queryindex/pom.xml
index 2dc40ce..04b3b1a 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -45,45 +45,24 @@
                 </executions>
             </plugin>
 
-<!--            <plugin>
-                <groupId>org.safehaus.chop</groupId>
-                <artifactId>chop-maven-plugin</artifactId>
-                <version>${chop.version}</version>
-
-
-                NOTE: you should be putting most of these variables into your 
settings.xml
-                as an automatically activated profile.
-
-
-                <configuration>
-                    <accessKey>${aws.s3.key}</accessKey>
-                    <secretKey>${aws.s3.secret}</secretKey>
-                    <availabilityZone>${availabilityZone}</availabilityZone>
-                    <bucketName>${aws.s3.bucket}</bucketName>
-                    <managerAppUsername>admin</managerAppUsername>
-                    
<managerAppPassword>${manager.app.password}</managerAppPassword>
-                    <testPackageBase>org.apache.usergrid</testPackageBase>
-                    <runnerSSHKeyFile>${runner.ssh.key.file}</runnerSSHKeyFile>
-                    <failIfCommitNecessary>false</failIfCommitNecessary>
-                    <amiID>${ami.id}</amiID>
-                    <instanceType>m1.large</instanceType>
-                    <resultsDirectory>${resultsDirectory}</resultsDirectory>
-                    <dumpType>${dumpType}</dumpType>
-                    <coldRestartTomcat>true</coldRestartTomcat>
-                    <awsSecurityGroup>${security.group}</awsSecurityGroup>
-                    
<runnerKeyPairName>${runner.keypair.name}</runnerKeyPairName>
-                    <runnerCount>6</runnerCount>
-                    <securityGroupExceptions>
-
-                        Add your own IP address as an exception to allow access
-                        but please do this in the settings.xml file .. 
essentially
-                        all parameters should be in the settings.xml file.
-
-                        <param>${myip.address}/32:24981</param>
-                        <param>${myip.address}/32:22</param>
-                    </securityGroupExceptions>
-                </configuration>
-            </plugin>-->
+
+            <plugin>
+                       <groupId>org.apache.maven.plugins</groupId>
+                       <artifactId>maven-surefire-plugin</artifactId>
+
+                       <configuration>
+                           <includes>
+                               <include>**/*IT.java</include>
+                               <include>**/*Test.java</include>
+                           </includes>
+                           <!-- run this one manually to stress test -->
+                           <excludes>
+                               <exclude>**/IndexLoadTestsIT.java</exclude>
+                           </excludes>
+
+                       </configuration>
+
+                   </plugin>
 
         </plugins>
     </build>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 580a7f4..77b6e7a 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -68,4 +68,9 @@ public interface EntityIndexBatch {
      */
     public BetterFuture execute();
 
+    /**
+     * Get the number of operations in the batch
+     * @return
+     */
+    public int size();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 92312d2..e9ba09c 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -204,6 +204,13 @@ public class EsEntityIndexBatchImpl implements 
EntityIndexBatch {
         return indexBatchBufferProducer.put(tempContainer);
     }
 
+
+    @Override
+    public int size() {
+        return container.getDeIndexRequests().size() + 
container.getIndexRequests().size();
+    }
+
+
     /**
      * Set the entity as a map with the context
      *
@@ -241,23 +248,24 @@ public class EsEntityIndexBatchImpl implements 
EntityIndexBatch {
 
             Field field = ( Field ) f;
 
-            if ( f instanceof ListField ) {
-                List list = ( List ) field.getValue();
-                entityMap.put( field.getName().toLowerCase(),
-                        new ArrayList( processCollectionForMap( list ) ) );
 
-                if ( !list.isEmpty() ) {
-                    if ( list.get( 0 ) instanceof String ) {
-                        entityMap.put( ANALYZED_STRING_PREFIX + 
field.getName().toLowerCase(),
-                                new ArrayList( processCollectionForMap( list ) 
) );
-                    }
-                }
-            }
-            else if ( f instanceof ArrayField ) {
+            if ( f instanceof ArrayField ) {
                 List list = ( List ) field.getValue();
                 entityMap.put( field.getName().toLowerCase(),
                         new ArrayList( processCollectionForMap( list ) ) );
             }
+            else if ( f instanceof ListField ) {
+                           List list = ( List ) field.getValue();
+                           entityMap.put( field.getName().toLowerCase(),
+                                   new ArrayList( processCollectionForMap( 
list ) ) );
+
+                           if ( !list.isEmpty() ) {
+                               if ( list.get( 0 ) instanceof String ) {
+                                   entityMap.put( ANALYZED_STRING_PREFIX + 
field.getName().toLowerCase(),
+                                           new ArrayList( 
processCollectionForMap( list ) ) );
+                               }
+                           }
+                       }
             else if ( f instanceof SetField ) {
                 Set set = ( Set ) field.getValue();
                 entityMap.put( field.getName().toLowerCase(),

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
index da68772..67a1731 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
@@ -67,7 +67,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-
+/**
+ * TODO, this is a copy from 1.0 and a mess.  Clean this up to be clearer as 
we iterate on our refactor of EM/RM
+ * Query should only be used for term querying, not identity of name lookup, 
that should
+ * come directly from cassandra
+ */
 public class Query {
     private static final Logger logger = LoggerFactory.getLogger( Query.class 
);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
index ecf3dfa..5aacef0 100644
--- 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
+++ 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
@@ -38,7 +38,7 @@ public interface IndexTestFig extends GuicyFig {
     @Default( "16" )
     public int getNumberOfWorkers();
 
-    @Key( "stresstest.numberofRecords" )
+    @Key( "stresstest.numberOfRecords" )
     @Default( "10000" )
     public int getNumberOfRecords();
 
@@ -54,4 +54,9 @@ public interface IndexTestFig extends GuicyFig {
     @Key( "stresstest.applicationId" )
     @Default( "0df46683-cdab-11e4-83c2-d2be4de3081a" )
     public String getApplicationId();
+
+    @Key( "stresstest.readThreads" )
+    @Default( "40" )
+    public int getConcurrentReadThreads();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index 82af950..057c472 100644
--- 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -19,45 +19,78 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
-import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.guice.IndexTestFig;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.ArrayField;
+import org.apache.usergrid.persistence.model.field.BooleanField;
+import org.apache.usergrid.persistence.model.field.DoubleField;
 import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 
 import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
+import static org.junit.Assert.assertEquals;
+
 
 /**
- * TODO: make CorePerformanceIT configurable, add CHOP markup.
+ * This is configuration via the properties in the IndexTestFig object.  Most 
of these values you won't need to touch.
+ * To run this against a live cluster.  You execute this maven command.
+ *
+ * <command> mvn test -Dtest=IndexLoadTestsIT#testHeavyLoadValidate 
-Dstresstest.numWorkers=16
+ * -Dstresstest.numberOfRecords=10000 </command>
+ *
+ * This will insert 10000 records for each worker thread.  There will be 16 
worker threads.  Validation will occur after
+ * the wait timeout (stresstest.validate.wait) of 2 seconds.  Up to 40 
concurrent queries (stresstest.readThreads) will
+ * be executed to validate each result.
+ *
+ * By default this test is excluded from surefire, and will need to be run 
manually
  */
 @RunWith( EsRunner.class )
 @UseModules( { TestIndexModule.class } )
-@Ignore( "Should only be run during load tests of elasticsearch" )
 public class IndexLoadTestsIT extends BaseIT {
     private static final Logger log = LoggerFactory.getLogger( 
IndexLoadTestsIT.class );
+    public static final String FIELD_WORKER_INDEX = "workerIndex";
+    private static final String FIELD_ORDINAL = "ordinal";
+    private static final String FIELD_UNIQUE_IDENTIFIER = "uniqueIdentifier";
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
 
     @ClassRule
     public static ElasticSearchResource es = new ElasticSearchResource();
@@ -69,70 +102,270 @@ public class IndexLoadTestsIT extends BaseIT {
     @Inject
     public EntityIndexFactory entityIndexFactory;
 
+    @Inject
+    public MetricsFactory metricsFactory;
+
+    private Meter batchWriteTPS;
+    private Timer batchWriteTimer;
+
+    private Meter queryTps;
+    private Timer queryTimer;
+
+    private Slf4jReporter reporter;
+
+
+    @Before
+    public void setupMeters() {
+        batchWriteTPS = metricsFactory.getMeter( IndexLoadTestsIT.class, 
"write.tps" );
+
+        batchWriteTimer = metricsFactory.getTimer( IndexLoadTestsIT.class, 
"write.timer" );
+
+        queryTps = metricsFactory.getMeter( IndexLoadTestsIT.class, 
"query.tps" );
+
+        queryTimer = metricsFactory.getTimer( IndexLoadTestsIT.class, 
"query.timer" );
+
+        reporter =
+            Slf4jReporter.forRegistry( metricsFactory.getRegistry() 
).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
+                         .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+
+        reporter.start( 30, TimeUnit.SECONDS );
+    }
+
 
+    @After
+    public void printMetricsBeforeShutdown() {
+        //stop the log reporter and print the last report
+        reporter.stop();
+        reporter.report();
+    }
+
+
+    /**
+     * Perform the following 1, spin up the specified number of workers For 
each worker, insert the specified number of
+     * elements
+     *
+     * Wait the wait time after buffer execution before beginning validate
+     *
+     * Validate every entity inserted is returned by a search.
+     */
     @Test
-    public void testHeavyLoad() {
+    public void testHeavyLoadValidate() {
+        final String userAppId = indexTestFig.getApplicationId();
+
+
+        //if it's unset, generate one
+        final String uniqueIdentifier = UUIDGenerator.newTimeUUID().toString();
 
-        final UUID applicationUUID = UUID.fromString( 
indexTestFig.getApplicationId() );
+        //use the appId supplied, or generate one
+        final UUID applicationUUID = UUID.fromString( userAppId );
 
         final Id applicationId = new SimpleId( applicationUUID, "application" 
);
         final ApplicationScope scope = new ApplicationScopeImpl( applicationId 
);
 
         final EntityIndex index = entityIndexFactory.createEntityIndex( scope 
);
 
+        final IndexScope indexScope = new IndexScopeImpl( applicationId, 
"test" );
+
         //create our index if it doesn't exist
         index.initializeIndex();
 
-        final Observable<Entity> createEntities = createStreamFromWorkers( 
index, applicationId );
 
-        //run them all
-        createEntities.toBlocking().last();
+        //delay our verification for indexing to happen
+        final Observable<DataLoadResult> dataLoadResults =
+            createStreamFromWorkers( index, indexScope, uniqueIdentifier 
).buffer( indexTestFig.getBufferSize() )
+                //perform a delay to let ES index from our batches
+                .delay( indexTestFig.getValidateWait(), TimeUnit.MILLISECONDS )
+                    //do our search in parallel, otherwise this test will take 
far too long
+                .flatMap( entitiesToValidate -> {
+                    return Observable.from( entitiesToValidate ).map( 
entityObservable -> {
+
+
+                        final int workerIndex = ( int ) 
entityObservable.getField( FIELD_WORKER_INDEX ).getValue();
+                        final int ordinal = ( int ) entityObservable.getField( 
FIELD_ORDINAL ).getValue();
+
+
+                        final Timer.Context queryTimerContext = 
queryTimer.time();
+
+
+                        //execute our search
+                        final CandidateResults results = index
+                            .search( indexScope, SearchTypes.fromTypes( 
indexScope.getName() ), Query.fromQLNullSafe(
+                                "select * where " + FIELD_WORKER_INDEX + "  = 
" + workerIndex + " AND " + FIELD_ORDINAL
+                                    + " = " + ordinal + " AND " + 
FIELD_UNIQUE_IDENTIFIER + " = '" + uniqueIdentifier
+                                    + "'" ) );
+
+                        queryTps.mark();
+                        queryTimerContext.stop();
+
+                        boolean found;
+
+                        if ( !results.isEmpty() && results.get( 0 
).getId().equals( entityObservable.getId() ) ) {
+                            found = true;
+                        }
+                        else {
+                            found = false;
+                        }
+
+                        return new EntitySearchResult( entityObservable, found 
);
+                    } ).subscribeOn( Schedulers.io() );
+                }, indexTestFig.getConcurrentReadThreads() )
+
+                    //collect all the results into a single data load result
+                .collect( () -> new DataLoadResult(), ( dataloadResult, 
entitySearchResult ) -> {
+                    if ( entitySearchResult.found ) {
+                        dataloadResult.success();
+                        return;
+                    }
+
+                    final int ordinal = ( int ) 
entitySearchResult.searched.getField( FIELD_ORDINAL ).getValue();
+                    final int worker = ( int ) 
entitySearchResult.searched.getField( FIELD_WORKER_INDEX ).getValue();
+
+                    dataloadResult.failed();
+
+                    log.error(
+                        "Could not find entity with worker {}, ordinal {}, and 
Id {} after waiting {} milliseconds",
+                        worker, ordinal, entitySearchResult.searched.getId(), 
indexTestFig.getValidateWait() );
+                } );
+
+
+        //wait for processing to finish
+        final DataLoadResult result = dataLoadResults.toBlocking().last();
+
+        final long expectedCount = indexTestFig.getNumberOfRecords() * 
indexTestFig.getNumberOfWorkers();
+
+        assertEquals( "Excepted to have no failures", 0, result.getFailCount() 
);
+
+        assertEquals( "Excepted to find all records", expectedCount, 
result.getSuccessCount() );
     }
 
 
-    public Observable<Entity> createStreamFromWorkers( final EntityIndex 
entityIndex, final Id ownerId ) {
+    public Observable<Entity> createStreamFromWorkers( final EntityIndex 
entityIndex, final IndexScope indexScope,
+                                                       final String 
uniqueIdentifier ) {
 
         //create a sequence of observables.  Each index will be it's own 
worker thread using the Schedulers.newthread()
         return Observable.range( 0, indexTestFig.getNumberOfWorkers() 
).flatMap(
-            integer -> createWriteObservable( entityIndex, ownerId, integer 
).subscribeOn( Schedulers.newThread() ) );
+            integer -> createWriteObservable( entityIndex, indexScope, 
uniqueIdentifier, integer )
+                .subscribeOn( Schedulers.newThread() ) );
     }
 
 
-    private Observable<Entity> createWriteObservable( final EntityIndex 
entityIndex, final Id ownerId,
-                                                      final int workerIndex ) {
-
-
-        final IndexScope scope = new IndexScopeImpl( ownerId, "test" );
+    private Observable<Entity> createWriteObservable( final EntityIndex 
entityIndex, final IndexScope indexScope,
+                                                      final String 
uniqueIdentifier, final int workerIndex ) {
 
 
         return Observable.range( 0, indexTestFig.getNumberOfRecords() )
 
             //create our entity
-            .map( new Func1<Integer, Entity>() {
-                @Override
-                public Entity call( final Integer integer ) {
-                    final Entity entity = new Entity( "test" );
-
-                    entity.setField( new IntegerField( "workerIndex", 
workerIndex ) );
-                    entity.setField( new IntegerField( "ordinal", integer ) );
-
-                    return entity;
-                }
-            } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new 
Action1<List<Entity>>() {
-                @Override
-                public void call( final List<Entity> entities ) {
-                    //take our entities and roll them into a batch
-                    Observable.from( entities )
-                              .collect( () -> entityIndex.createBatch(), ( 
entityIndexBatch, entity ) -> {
-
-                                  entityIndexBatch.index( scope, entity );
-                              } ).doOnNext( entityIndexBatch -> {
-                        entityIndexBatch.execute();
-                    } ).toBlocking().last();
-                }
-            } )
+            .map( integer -> {
+                final Entity entity = new Entity( indexScope.getName() );
+
+                entity.setField( new IntegerField( FIELD_WORKER_INDEX, 
workerIndex ) );
+                entity.setField( new IntegerField( FIELD_ORDINAL, integer ) );
+                entity.setField( new StringField( FIELD_UNIQUE_IDENTIFIER, 
uniqueIdentifier ) );
+
+
+                EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
+
+                //add some fields for indexing
+
+                entity.setField( new StringField( "emtpyField", "" ) );
+                entity.setField( new StringField( "singleCharField1", "L" ) );
+                entity.setField( new StringField( "longStringField", 
"000000000000001051" ) );
+                entity.setField( new StringField( "singleCharField2", "0" ) );
+                entity.setField( new StringField( "singleCharField3", "0" ) );
+                entity.setField( new StringField( "singleCharField4", "0" ) );
+                entity.setField( new StringField( "dept", "VALUE" ) );
+                entity.setField( new StringField( "description", "I'm a longer 
description" ) );
+
+                ArrayField<Long> array = new ArrayField<>("longs");
+
+                array.add( 9315321008910l );
+                array.add( 9315321009016l );
+                array.add( 9315321009115l );
+                array.add( 9315321009313l );
+                array.add( 9315321009320l );
+                array.add( 9315321984955l );
+
+                entity.setField( array );
+
+                entity.setField( new StringField( "singleCharField5", "N" ) );
+                entity.setField( new BooleanField( "booleanField1", true ) );
+                entity.setField( new BooleanField( "booleanField2", false ) );
+                entity.setField( new StringField( "singleCharField5", "N" ) );
+                entity.setField( new StringField( "singleCharField6", "N" ) );
+                entity.setField( new StringField( "stringField", "ALL CAPS)); 
I MEAN IT" ) );
+                entity.setField( new DoubleField( "doubleField1", 750.0 ) );
+                entity.setField( new StringField( "charField", "AB" ) );
+                entity.setField( new StringField( "name", 
"000000000000001051-1004" ) );
+
 
+                return entity;
+            } )
+                //buffer up a batch size
+            .buffer( indexTestFig.getBufferSize() ).doOnNext( entities -> {
+
+                //take our entities and roll them into a batch
+                Observable.from( entities ).collect( () -> 
entityIndex.createBatch(), ( entityIndexBatch, entity ) -> {
+                    entityIndexBatch.index( indexScope, entity );
+                } ).doOnNext( entityIndexBatch -> {
+                    log.info( "Indexing next {} in batch", 
entityIndexBatch.size() );
+                    //gather the metrics
+                    final Timer.Context time = batchWriteTimer.time();
+                    batchWriteTPS.mark();
+
+
+                    //execute
+                    entityIndexBatch.execute();
+                    //stop
+                    time.close();
+                } ).toBlocking().last();
+            } )
                 //translate back into a stream of entities for the caller to 
use
-            .flatMap(entities -> Observable.from( entities ) );
+            .flatMap( entities -> Observable.from( entities ) );
+    }
+
+
+    /**
+     * Class for entity search results
+     */
+    private static class EntitySearchResult {
+
+        public final Entity searched;
+        public final boolean found;
+
+
+        private EntitySearchResult( final Entity searched, final boolean found 
) {
+            this.searched = searched;
+            this.found = found;
+        }
+    }
+
+
+    /**
+     * Class for collecting results
+     */
+    private static final class DataLoadResult {
+        private final AtomicLong successCount = new AtomicLong( 0 );
+        private final AtomicLong failCount = new AtomicLong( 0 );
+
+
+        public void success() {
+            successCount.addAndGet( 1 );
+        }
+
+
+        public long getSuccessCount() {
+            return successCount.get();
+        }
+
+
+        public void failed() {
+            failCount.addAndGet( 1 );
+        }
+
+
+        public long getFailCount() {
+            return failCount.get();
+        }
     }
 }

Reply via email to