Github user snoopdave commented on a diff in the pull request:
https://github.com/apache/incubator-usergrid/pull/275#discussion_r32321377
--- Diff:
stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java ---
@@ -133,84 +163,129 @@ private void importAdminUsers() throws Exception {
*
* @param fileName Name of admin user data file.
*/
- private void importAdminUsers( String fileName ) throws Exception {
+ private void importAdminUsers(final String fileName,
+ final int writeThreadCount,
+ final int auditThreadCount) throws
Exception {
int count = 0;
- File adminUsersFile = new File( importDir, fileName );
+ File adminUsersFile = new File(importDir, fileName);
+
+ logger.info("----- Loading file: " +
adminUsersFile.getAbsolutePath());
+ JsonParser jp = getJsonParserForFile(adminUsersFile);
+
+ int loopCounter = 0;
- logger.info( "----- Loading file: " +
adminUsersFile.getAbsolutePath() );
- JsonParser jp = getJsonParserForFile( adminUsersFile );
+ BlockingQueue<Map<String, Object>> workQueue = new
LinkedBlockingQueue<Map<String, Object>>();
+ BlockingQueue<Map<String, Object>> auditQueue = new
LinkedBlockingQueue<Map<String, Object>>();
+
+ startAdminWorkers(workQueue, auditQueue, writeThreadCount);
+ startAdminAuditors(auditQueue, auditThreadCount);
JsonToken token = jp.nextToken();
- validateStartArray( token );
+ validateStartArray(token);
- EntityManager em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID
);
+ while (jp.nextValue() != JsonToken.END_ARRAY) {
+ loopCounter += 1;
- while ( jp.nextValue() != JsonToken.END_ARRAY ) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> entityProps =
jp.readValueAs(HashMap.class);
+ if (loopCounter % 100 == 1)
+ logger.info("Publishing to queue... counter=" +
loopCounter);
- @SuppressWarnings( "unchecked" )
- Map<String, Object> entityProps = jp.readValueAs(
HashMap.class );
+ workQueue.add(entityProps);
+ }
- // Import/create the entity
- UUID uuid = getId( entityProps );
- String type = getType( entityProps );
+ waitForQueueAndMeasure(workQueue, adminWriteThreads, "Admin
Write");
+ waitForQueueAndMeasure(auditQueue, adminAuditThreads, "Admin
Audit");
+ logger.info("----- End: Imported {} admin users from file {}",
+ count, adminUsersFile.getAbsolutePath());
- try {
- em.create( uuid, type, entityProps );
+ jp.close();
+ }
- logger.debug( "Imported admin user {} {}", uuid,
entityProps.get( "username" ) );
- count++;
- if ( count % 1000 == 0 ) {
- logger.info("Imported {} admin users", count);
- }
- }
- catch ( DuplicateUniquePropertyExistsException de ) {
- logger.warn( "Unable to create entity. It appears to be a
duplicate: " +
- "id={}, type={}, name={}, username={}",
- new Object[] { uuid, type, entityProps.get("name"),
entityProps.get("username")});
- if ( logger.isDebugEnabled() ) {
- logger.debug( "Exception" , de );
- }
- continue;
- }
+ private static void waitForQueueAndMeasure(final BlockingQueue
workQueue,
+ final Map<Stoppable,
Thread> threadMap,
+ final String identifier)
throws InterruptedException {
+ double rateAverageSum = 0;
+ int iterationCounter = 0;
- if ( em.get( uuid ) == null ) {
- logger.error( "Holy hell, we wrote an entity and it's
missing. " +
- "Entity Id was {} and type is {}", uuid,
type );
- System.exit( 1 );
- }
- echo( entityProps );
+ while (!workQueue.isEmpty()) {
+ iterationCounter += 1;
+
+ int sizeLast = workQueue.size();
+ long lastTime = System.currentTimeMillis();
+ logger.info("Queue {} is not empty, remaining size={},
waiting...", identifier, sizeLast);
+ Thread.sleep(5000);
+
+ long timeNow = System.currentTimeMillis();
+ int sizeNow = workQueue.size();
+
+ int processed = sizeLast - sizeNow;
+
+ long timeDelta = timeNow - lastTime;
+
+ double rateLast = (double) processed / (timeDelta / 1000);
+ rateAverageSum += rateLast;
+
+ long timeRemaining = sizeLast / (long) (rateAverageSum /
iterationCounter);
--- End diff --
I had to change this to:
long timeRemaining = (long) ( sizeLast / (rateAverageSum /
iterationCounter) );
Otherwise the unit tests throws a divide by zero error.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---