This is an automated email from the ASF dual-hosted git repository.
mrusso pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/usergrid.git
The following commit(s) were added to refs/heads/master by this push:
new a88956c Made entity export threads configurable Enables configuring
number of threads for fetching entities, assets and entity members This fixes
issue USERGRID-1356
new ac313ab Merge pull request #620 from keyurkarnik/keyurkarnik_export
a88956c is described below
commit a88956cc5d7aad082dcc4bf6a8020cd52c7ec10c
Author: Keyur Karnik <[email protected]>
AuthorDate: Tue Jan 29 14:37:33 2019 -0800
Made entity export threads configurable
Enables configuring number of threads for fetching entities,
assets and entity members
This fixes issue USERGRID-1356
---
.../java/org/apache/usergrid/tools/Export.java | 91 +++++++++++++++++++---
1 file changed, 81 insertions(+), 10 deletions(-)
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
index b07d09d..9a67945 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -59,7 +60,6 @@ import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
@@ -78,6 +78,7 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.base.Optional;
import com.google.common.collect.BiMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -89,7 +90,8 @@ import rx.schedulers.Schedulers;
public class Export extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( Export.class );
- public static final String LAST_ID = "lastId";
+ private static final String ENTITY_FETCHER_THREADS =
"entityFetchThreads";
+ private static final String ENTITY_MEMBER_FETCHER_MULT =
"entityThreadMult";
@Autowired
@@ -100,11 +102,23 @@ public class Export extends ExportingToolBase {
private AllEntityIdsObservable allEntityIdsObs;
private SimpleEdge lastEdge = null;
+ //number of threads for fetching entity contents. Each thread will handle
a batch of 1000 entity ids
+ private int entityFetcherThreads = 50;
+ //after an individual entity is fetched, the entity members like assets,
connections etc need to be fetched
+ //depending on how heavy the assets/connections might be, we might need to
multiply the factor so that more threads are allocated
+ //for pulling the members quickly without the queue backing up.
+ private int entityMemberFetcherMultiplier = 1;
+
+
//TODO : Add blocking queues for these executors where appropriate
- private ExecutorService orgAppCollParallelizer =
Executors.newFixedThreadPool(3);
- private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
- private ExecutorService enitityMemberFetcher =
Executors.newFixedThreadPool(10);
- private ExecutorService assetsFetcher =
Executors.newFixedThreadPool(10);
+ private ExecutorService orgAppCollParallelizer;
+
+ //fetches the entity content
+ private ExecutorService entityFetcher;
+ //fetches the entity members like connections etc for a given entity
+ private ExecutorService entityMemberFetcher;
+ //fetches the assets for a given entity
+ private ExecutorService assetsFetcher;
@Override
@@ -113,13 +127,70 @@ public class Export extends ExportingToolBase {
Options options = super.createOptions();
- Option lastId = OptionBuilder.withArgName( LAST_ID ).hasArg()
- .withDescription( "Last Entity Id to resume from" ).create(
LAST_ID );
- options.addOption( lastId);
+
+ Option entityFetcherThreads = OptionBuilder.withArgName(
ENTITY_FETCHER_THREADS ).hasArg()
+ .withDescription( "Number of threads to fetch entities in
parallel (defaults to 50)" ).create( ENTITY_FETCHER_THREADS );
+ options.addOption( entityFetcherThreads);
+
+ Option entityMemberFetcherMultiplier = OptionBuilder.withArgName(
ENTITY_MEMBER_FETCHER_MULT ).hasArg()
+ .withDescription( "This defines the number of threads for
fetching entity members like assets/collections by multiplying the number of
entity fetcher threads. Defaults to 1" ).create( ENTITY_MEMBER_FETCHER_MULT );
+ options.addOption( entityMemberFetcherMultiplier);
return options;
}
+
+ @Override
+ protected void validateOptions(CommandLine line) throws
MissingOptionException {
+ super.validateOptions(line);
+
+ if (line.hasOption(ENTITY_FETCHER_THREADS)) {
+ try {
+
Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
+ } catch (NumberFormatException e) {
+ throw new MissingOptionException("Entity
fetcher threads need to be a positive integer");
+ }
+ }
+
+ if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
+ try {
+
Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
+ } catch (NumberFormatException e) {
+ throw new MissingOptionException("Entity member
thread multiplier needs to be a positive integer");
+ }
+ }
+ }
+
+ @Override
+ protected void applyExportParams(CommandLine line) {
+
+ super.applyExportParams(line);
+
+ if (line.hasOption(ENTITY_FETCHER_THREADS)) {
+ entityFetcherThreads =
Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
+
+ if (entityFetcherThreads < 1) {
+ entityFetcherThreads = 50;
+ }
+ }
+
+ if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
+ entityMemberFetcherMultiplier =
Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
+
+ if (entityMemberFetcherMultiplier < 1) {
+ entityMemberFetcherMultiplier = 1;
+ }
+ if (entityMemberFetcherMultiplier > 5) {
+ entityMemberFetcherMultiplier = 5;
+ }
+ }
+
+ orgAppCollParallelizer = Executors.newFixedThreadPool(5, new
ThreadFactoryBuilder().setNameFormat("OrgAppColl-Parallelizer-%d").build());
+ entityFetcher =
Executors.newFixedThreadPool(entityFetcherThreads, new
ThreadFactoryBuilder().setNameFormat("Export-EntityFetcher-%d").build());
+ entityMemberFetcher =
Executors.newFixedThreadPool(entityFetcherThreads *
entityMemberFetcherMultiplier, new
ThreadFactoryBuilder().setNameFormat("Export-EntityMemberFetcher-%d").build());
+ assetsFetcher =
Executors.newFixedThreadPool(entityFetcherThreads *
entityMemberFetcherMultiplier, new
ThreadFactoryBuilder().setNameFormat("Export-AssetFetcher-%d").build());
+
+ }
@Override
public void runTool( CommandLine line ) throws Exception {
@@ -432,7 +503,7 @@ public class Export extends ExportingToolBase {
ConnectableObservable<Results> entityObs = Observable.just(entities)
.publish();
-
entityObs.subscribeOn(Schedulers.from(enitityMemberFetcher));
+
entityObs.subscribeOn(Schedulers.from(entityMemberFetcher));
// fetch and write
connections