Added heartbeats to ExportService. Sends one between writing entities every time one takes more than 5 seconds.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bc70a3dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bc70a3dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bc70a3dd Branch: refs/pull/70/merge Commit: bc70a3dd87a6b3970eac4d0f0349ddd93653d646 Parents: 9a16d8f Author: George Reyes <[email protected]> Authored: Thu Feb 13 14:53:40 2014 -0800 Committer: George Reyes <[email protected]> Committed: Thu Feb 13 14:53:40 2014 -0800 ---------------------------------------------------------------------- .../usergrid/management/export/ExportJob.java | 2 +- .../management/export/ExportService.java | 3 +- .../management/export/ExportServiceImpl.java | 136 ++++++++----------- 3 files changed, 63 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java b/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java index bd72c6e..ec5f27b 100644 --- a/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java +++ b/stack/services/src/main/java/org/usergrid/management/export/ExportJob.java @@ -39,7 +39,7 @@ public class ExportJob extends OnlyOnceJob { jobExecution.heartbeat(); //pass in jobExecution so that you can call the heartbeat in the do export method. - exportService.doExport( config ); + exportService.doExport( config, jobExecution ); logger.info( "executed ExportJob completed normally" ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportService.java b/stack/services/src/main/java/org/usergrid/management/export/ExportService.java index e2e5d54..aa1dd1b 100644 --- a/stack/services/src/main/java/org/usergrid/management/export/ExportService.java +++ b/stack/services/src/main/java/org/usergrid/management/export/ExportService.java @@ -3,6 +3,7 @@ package org.usergrid.management.export; import java.util.UUID; +import org.usergrid.batch.JobExecution; import org.usergrid.management.ExportInfo; @@ -23,7 +24,7 @@ public interface ExportService { * Perform the export to the external resource * @param config */ - void doExport(ExportInfo config) throws Exception; + void doExport(ExportInfo config, JobExecution jobExecution) throws Exception; /** * Returns the UUID to the user http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bc70a3dd/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java b/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java index 5f532c7..dbbe520 100644 --- a/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java +++ b/stack/services/src/main/java/org/usergrid/management/export/ExportServiceImpl.java @@ -19,6 +19,7 @@ import org.codehaus.jackson.util.DefaultPrettyPrinter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.usergrid.batch.JobExecution; import org.usergrid.batch.service.SchedulerService; import org.usergrid.management.ExportInfo; import org.usergrid.management.ManagementService; @@ -40,7 +41,7 @@ import com.google.common.collect.BiMap; * * */ -public class ExportServiceImpl implements ExportService{ +public class ExportServiceImpl implements ExportService { private static final Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class ); @@ -57,6 +58,9 @@ public class ExportServiceImpl implements ExportService{ //Maximum amount of entities retrieved in a single go. public static final int MAX_ENTITY_FETCH = 100; + //Amount of time that has passed before sending another heart beat in millis + public static final int TIMESTAMP_DELTA = 5000; + private JsonFactory jsonFactory = new JsonFactory(); private String outputDir = "/Users/ApigeeCorportation"; @@ -101,17 +105,17 @@ public class ExportServiceImpl implements ExportService{ // good error messages when not found. //validate user has access key to org (rather valid user has admin access token) - //this is token validation + //this is token validation JobData jobData = new JobData(); - jobData.setProperty( "exportInfo",config ); + jobData.setProperty( "exportInfo", config ); long soonestPossible = System.currentTimeMillis() + 250; //sch grace period - JobData retJobData = sch.createJob( "exportJob",soonestPossible, jobData ); + JobData retJobData = sch.createJob( "exportJob", soonestPossible, jobData ); jobUUID = retJobData.getUuid(); try { JobStat merp = sch.getStatsForJob( "exportJob", retJobData.getUuid() ); - System.out.println("hi"); + System.out.println( "hi" ); } catch ( Exception e ) { logger.error( "could not get stats for job" ); @@ -120,24 +124,23 @@ public class ExportServiceImpl implements ExportService{ @Override - public void doExport( final ExportInfo config ) throws Exception { + public void doExport( final ExportInfo config, final JobExecution jobExecution ) throws Exception { Map<UUID, String> organizations = getOrgs(); for ( Map.Entry<UUID, String> organization : organizations.entrySet() ) { - exportApplicationsForOrg( organization , config ); + exportApplicationsForOrg( organization, config, jobExecution ); } } - private Map<UUID, String> getOrgs() throws Exception { // Loop through the organizations - // TODO:this will come from the orgs in schedule when you do the validations. delete orgId + // TODO:this will come from the orgs in schedule when you do the validations. delete orgId UUID orgId = null; Map<UUID, String> organizationNames = null; - // managementService.setup(); + // managementService.setup(); if ( orgId == null ) { @@ -193,47 +196,29 @@ public class ExportServiceImpl implements ExportService{ this.managementService = managementService; } - public UUID getJobUUID () { + + public UUID getJobUUID() { return jobUUID; } -//write test checking to see what happens if the input stream is closed or wrong. -//TODO: make multipart streaming functional - //currently only stores the collection in memory then flushes it. - private void exportApplicationsForOrg( Map.Entry<UUID, String> organization,final ExportInfo config ) throws Exception { - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - //baos.reset(); - //ObjectOutputStream oos = new ObjectOutputStream(baos); - -// OutputStreamWriter osw = new OutputStreamWriter(oos,"UTF-8"); -// PrintWriter out = new PrintWriter( osw ); -// -// //oos.reset(); -// -// Writer wrtJSon = new OutputStreamWriter( oos, "UTF-8" ); + //write test checking to see what happens if the input stream is closed or wrong. + //TODO: make multipart streaming functional + //currently only stores the collection in memory then flushes it. + private void exportApplicationsForOrg( Map.Entry<UUID, String> organization, final ExportInfo config, + final JobExecution jobExecution ) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); logger.info( "" + organization ); - // Loop through the applications per organization BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( organization.getKey() ); for ( Map.Entry<UUID, String> application : applications.entrySet() ) { logger.info( application.getValue() + " : " + application.getKey() ); - // Get the JSon serializer. - //Creates the applications folder - /* What needs to be done: - * take the file name generator and create one that will only output the collections we need - * this will probably icnlude taking both file names, and making sure that it is not doing - * two passes as todd had it originally. */ - - // JsonGenerator jg = getJsonGenerator( createOutputFile( "application", application.getValue() ) ); - - String appFileName = prepareOutputFileName( "application", application.getValue() ); + String appFileName = prepareOutputFileName( "application", application.getValue() ); JsonGenerator jg = getJsonGenerator( baos ); @@ -243,8 +228,8 @@ public class ExportServiceImpl implements ExportService{ Entity appEntity = rootEm.get( application.getKey() ); + jobExecution.heartbeat(); Map<String, Object> dictionaries = new HashMap<String, Object>(); - for ( String dictionary : rootEm.getDictionaries( appEntity ) ) { Map<Object, Object> dict = rootEm.getDictionaryAsMap( appEntity, dictionary ); @@ -273,40 +258,31 @@ public class ExportServiceImpl implements ExportService{ nsEntity.setMetadata( "counters", entityCounters ); nsEntity.setMetadata( "collections", collections ); + jobExecution.heartbeat(); jg.writeStartArray(); jg.writeObject( nsEntity ); - // Create a GENERATOR for the application collections. - //JsonGenerator collectionsJg = getJsonGenerator( createOutputFile( "collections", application.getValue() ) ); - - //String collectionsFilename = prepareOutputFileName( "collections","appDummyName" ); - //JsonGenerator collectionsJg = getJsonGenerator( oos ); - - - //collectionsJg.writeStartObject(); - // jg.writeStartObject(); Map<String, Object> metadata = em.getApplicationCollectionMetadata(); - //don't need to echo as not a command line tool anymore - //echo( JsonUtils.mapToFormattedJsonString( metadata ) ); + long starting_time = System.currentTimeMillis(); // Loop through the collections. This is the only way to loop // through the entities in the application (former namespace). for ( String collectionName : metadata.keySet() ) { + Query query = new Query(); query.setLimit( MAX_ENTITY_FETCH ); query.setResultsLevel( Results.Level.ALL_PROPERTIES ); Results entities = em.searchCollection( em.getApplicationRef(), collectionName, query ); - while ( entities.size() > 0 ) { + starting_time = checkTimeDelta( starting_time, jobExecution ); + + while ( entities.size() > 0 ) { + jobExecution.heartbeat(); for ( Entity entity : entities ) { - // Export the entity first and later the collections for - // this entity. jg.writeObject( entity ); - //echo( entity ); - saveCollectionMembers( jg, em, application.getValue(), entity ); } @@ -324,21 +300,31 @@ public class ExportServiceImpl implements ExportService{ // Close writer and file for this application. - // logger.warn(); + // logger.warn(); jg.writeEndArray(); jg.close(); - baos.flush(); baos.close(); - InputStream is = new ByteArrayInputStream( baos.toByteArray()); - //InputStream is = new ObjectInputStream ); + InputStream is = new ByteArrayInputStream( baos.toByteArray() ); s3Export.copyToS3( is, config ); - //below line doesn't copy very good data anyways. } } + + public long checkTimeDelta( long startingTime, final JobExecution jobExecution ) { + + long cur_time = System.currentTimeMillis(); + + if ( startingTime <= ( cur_time - TIMESTAMP_DELTA ) ) { + jobExecution.heartbeat(); + return cur_time; + } + return startingTime; + } + + /** * Serialize and save the collection members of this <code>entity</code> * @@ -349,6 +335,8 @@ public class ExportServiceImpl implements ExportService{ private void saveCollectionMembers( JsonGenerator jg, EntityManager em, String application, Entity entity ) throws Exception { + long timestamp = System.currentTimeMillis(); + Set<String> collections = em.getCollections( entity ); // Only create entry for Entities that have collections @@ -356,16 +344,14 @@ public class ExportServiceImpl implements ExportService{ return; } - // jg.writeFieldName( entity.getUuid().toString() ); - jg.writeStartObject(); for ( String collectionName : collections ) { jg.writeFieldName( collectionName ); - // Start collection array. jg.writeStartArray(); - Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Results.Level.IDS, false ); + Results collectionMembers = + em.getCollection( entity, collectionName, null, 100000, Results.Level.IDS, false ); List<UUID> entityIds = collectionMembers.getIds(); @@ -389,11 +375,10 @@ public class ExportServiceImpl implements ExportService{ jg.writeEndObject(); } - // protected JsonGenerator getJsonGenerator( String outFile ) throws IOException { - // return getJsonGenerator( new File( outputDir, outFile ) ); - // } - /** Persists the connection for this entity. */ + /** + * Persists the connection for this entity. + */ private void saveDictionaries( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception { jg.writeFieldName( "dictionaries" ); @@ -424,7 +409,9 @@ public class ExportServiceImpl implements ExportService{ } - /** Persists the connection for this entity. */ + /** + * Persists the connection for this entity. + */ private void saveConnections( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception { jg.writeFieldName( "connections" ); @@ -449,7 +436,7 @@ public class ExportServiceImpl implements ExportService{ } - protected JsonGenerator getJsonGenerator(ByteArrayOutputStream out ) throws IOException { + protected JsonGenerator getJsonGenerator( ByteArrayOutputStream out ) throws IOException { //TODO:shouldn't the below be UTF-16? //PrintWriter out = new PrintWriter( outFile, "UTF-8" ); @@ -459,8 +446,9 @@ public class ExportServiceImpl implements ExportService{ return jg; } + protected File createOutputFile( String type, String name ) { - return new File(prepareOutputFileName( type, name ) ); + return new File( prepareOutputFileName( type, name ) ); } @@ -492,12 +480,8 @@ public class ExportServiceImpl implements ExportService{ return outputFileName; } + @Autowired @Override - public void setS3Export (S3Export s3Export) { this.s3Export = s3Export; } - - - - - + public void setS3Export( S3Export s3Export ) { this.s3Export = s3Export; } }
