This is an automated email from the ASF dual-hosted git repository.

mrusso pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/usergrid.git


The following commit(s) were added to refs/heads/master by this push:
     new a88956c  Made entity export threads configurable Enables configuring 
number of threads for fetching entities, assets and entity members This fixes 
issue USERGRID-1356
     new ac313ab  Merge pull request #620 from keyurkarnik/keyurkarnik_export
a88956c is described below

commit a88956cc5d7aad082dcc4bf6a8020cd52c7ec10c
Author: Keyur Karnik <[email protected]>
AuthorDate: Tue Jan 29 14:37:33 2019 -0800

    Made entity export threads configurable
    Enables configuring number of threads for fetching entities,
    assets and entity members
    This fixes issue USERGRID-1356
---
 .../java/org/apache/usergrid/tools/Export.java     | 91 +++++++++++++++++++---
 1 file changed, 81 insertions(+), 10 deletions(-)

diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java 
b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
index b07d09d..9a67945 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.MissingOptionException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -59,7 +60,6 @@ import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
@@ -78,6 +78,7 @@ import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.google.common.base.Optional;
 import com.google.common.collect.BiMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
@@ -89,7 +90,8 @@ import rx.schedulers.Schedulers;
 public class Export extends ExportingToolBase {
 
     static final Logger logger = LoggerFactory.getLogger( Export.class );
-    public static final String LAST_ID = "lastId";
+       private static final String ENTITY_FETCHER_THREADS = 
"entityFetchThreads";
+       private static final String ENTITY_MEMBER_FETCHER_MULT = 
"entityThreadMult";
     
     
     @Autowired
@@ -100,11 +102,23 @@ public class Export extends ExportingToolBase {
     private AllEntityIdsObservable allEntityIdsObs;
     private SimpleEdge lastEdge = null;
     
+    //number of threads for fetching entity contents. Each thread will handle 
a batch of 1000 entity ids
+    private int entityFetcherThreads = 50;
+    //after an individual entity is fetched, the entity members like assets, 
connections etc need to be fetched
+    //depending on how heavy the assets/connections might be, we might need to 
multiply the factor so that more threads are allocated
+    //for  pulling the members quickly without the queue backing up.
+    private int entityMemberFetcherMultiplier = 1;
+    
+    
     //TODO : Add blocking queues for these executors where appropriate
-    private ExecutorService orgAppCollParallelizer = 
Executors.newFixedThreadPool(3);
-    private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
-       private ExecutorService enitityMemberFetcher = 
Executors.newFixedThreadPool(10);
-       private ExecutorService assetsFetcher = 
Executors.newFixedThreadPool(10);
+    private ExecutorService orgAppCollParallelizer;
+    
+    //fetches the entity content
+    private ExecutorService entityFetcher;
+    //fetches the entity members like connections etc for a given entity
+       private ExecutorService entityMemberFetcher;
+       //fetches the assets for a given entity
+       private ExecutorService assetsFetcher;
        
 
     @Override
@@ -113,13 +127,70 @@ public class Export extends ExportingToolBase {
   
        Options options = super.createOptions();
        
-       Option lastId = OptionBuilder.withArgName( LAST_ID ).hasArg()
-                .withDescription( "Last Entity Id to resume from" ).create( 
LAST_ID );
-       options.addOption( lastId);
+       
+       Option entityFetcherThreads = OptionBuilder.withArgName( 
ENTITY_FETCHER_THREADS ).hasArg()
+                .withDescription( "Number of threads to fetch entities in 
parallel (defaults to 50)" ).create( ENTITY_FETCHER_THREADS );
+       options.addOption( entityFetcherThreads);
+       
+       Option entityMemberFetcherMultiplier = OptionBuilder.withArgName( 
ENTITY_MEMBER_FETCHER_MULT ).hasArg()
+                .withDescription( "This defines the number of threads for 
fetching entity members like assets/collections by multiplying the number of 
entity fetcher threads. Defaults to 1" ).create( ENTITY_MEMBER_FETCHER_MULT );
+       options.addOption( entityMemberFetcherMultiplier);
        
        return options;
     }
+    
+    @Override
+       protected void validateOptions(CommandLine line) throws 
MissingOptionException {
+               super.validateOptions(line);
+
+               if (line.hasOption(ENTITY_FETCHER_THREADS)) {
+                       try {
+                               
Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
+                       } catch (NumberFormatException e) {
+                               throw new MissingOptionException("Entity 
fetcher threads need to be a positive integer");
+                       }
+               }
+
+               if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
+                       try {
+                               
Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
+                       } catch (NumberFormatException e) {
+                               throw new MissingOptionException("Entity member 
thread multiplier needs to be a positive integer");
+                       }
+               }
 
+       }
+    
+    @Override
+       protected void applyExportParams(CommandLine line) {
+
+               super.applyExportParams(line);
+
+               if (line.hasOption(ENTITY_FETCHER_THREADS)) {
+                       entityFetcherThreads = 
Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
+
+                       if (entityFetcherThreads < 1) {
+                               entityFetcherThreads = 50;
+                       }
+               }
+
+               if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
+                       entityMemberFetcherMultiplier = 
Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
+
+                       if (entityMemberFetcherMultiplier < 1) {
+                               entityMemberFetcherMultiplier = 1;
+                       }
+                       if (entityMemberFetcherMultiplier > 5) {
+                               entityMemberFetcherMultiplier = 5;
+                       }
+               }
+
+               orgAppCollParallelizer = Executors.newFixedThreadPool(5, new 
ThreadFactoryBuilder().setNameFormat("OrgAppColl-Parallelizer-%d").build());
+               entityFetcher = 
Executors.newFixedThreadPool(entityFetcherThreads, new 
ThreadFactoryBuilder().setNameFormat("Export-EntityFetcher-%d").build());
+               entityMemberFetcher = 
Executors.newFixedThreadPool(entityFetcherThreads * 
entityMemberFetcherMultiplier, new 
ThreadFactoryBuilder().setNameFormat("Export-EntityMemberFetcher-%d").build());
+               assetsFetcher = 
Executors.newFixedThreadPool(entityFetcherThreads * 
entityMemberFetcherMultiplier, new 
ThreadFactoryBuilder().setNameFormat("Export-AssetFetcher-%d").build());
+
+       }
     
     @Override
     public void runTool( CommandLine line ) throws Exception {
@@ -432,7 +503,7 @@ public class Export extends ExportingToolBase {
                                                        
                                                        
ConnectableObservable<Results> entityObs = Observable.just(entities)
                                                                        
.publish();
-                                                       
entityObs.subscribeOn(Schedulers.from(enitityMemberFetcher));
+                                                       
entityObs.subscribeOn(Schedulers.from(entityMemberFetcher));
        
                                                        
                                                        // fetch and write 
connections

Reply via email to