http://git-wip-us.apache.org/repos/asf/usergrid/blob/cb1356c2/stack/services/src/main/java/org/apache/usergrid/management/export/ExportServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/ExportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/export/ExportServiceImpl.java deleted file mode 100644 index eadc925..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/export/ExportServiceImpl.java +++ /dev/null @@ -1,634 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.management.export; - - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.batch.JobExecution; -import org.apache.usergrid.batch.service.SchedulerService; -import org.apache.usergrid.management.ApplicationInfo; -import org.apache.usergrid.management.ManagementService; -import org.apache.usergrid.persistence.ConnectionRef; -import org.apache.usergrid.persistence.Entity; -import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityManagerFactory; -import org.apache.usergrid.persistence.PagingResultsIterator; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.SimpleEntityRef; -import org.apache.usergrid.persistence.entities.Export; -import org.apache.usergrid.persistence.entities.JobData; -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.Query.Level; - -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.BiMap; - - -/** - * Need to refactor out the mutliple orgs being take , need to factor out the multiple apps it will just be the one app - * and the one org and all of it's collections. - */ -public class ExportServiceImpl implements ExportService { - - - private static final Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class ); - public static final String EXPORT_ID = "exportId"; - public static final String EXPORT_JOB_NAME = "exportJob"; - //dependency injection - private SchedulerService sch; - - //injected the Entity Manager Factory - protected EntityManagerFactory emf; - - //inject Management Service to access Organization Data - private ManagementService managementService; - - //Maximum amount of entities retrieved in a single go. - public static final int MAX_ENTITY_FETCH = 1000; - - //Amount of time that has passed before sending another heart beat in millis - public static final int TIMESTAMP_DELTA = 5000; - - private JsonFactory jsonFactory = new JsonFactory(); - - - @Override - public UUID schedule( final Map<String, Object> config ) throws Exception { - - if ( config == null ) { - logger.error( "export information cannot be null" ); - return null; - } - - EntityManager em; - try { - em = emf.getEntityManager( emf.getManagementAppId() ); - Set<String> collections = em.getApplicationCollections(); - - if ( !collections.contains( "exports" ) ) { - em.createApplicationCollection( "exports" ); - } - } - catch ( Exception e ) { - logger.error( "application doesn't exist within the current context" ); - return null; - } - - Export export = new Export(); - - //update state - try { - export = em.create( export ); - } - catch ( Exception e ) { - logger.error( "Export entity creation failed" ); - return null; - } - - export.setState( Export.State.CREATED ); - em.update( export ); - - //set data to be transferred to exportInfo - JobData jobData = new JobData(); - jobData.setProperty( "exportInfo", config ); - jobData.setProperty( EXPORT_ID, export.getUuid() ); - - long soonestPossible = System.currentTimeMillis() + 250; //sch grace period - - //schedule job - sch.createJob( EXPORT_JOB_NAME, soonestPossible, jobData ); - - //update state - export.setState( Export.State.SCHEDULED ); - em.update( export ); - - return export.getUuid(); - } - - - /** - * Query Entity Manager for the string state of the Export Entity. This corresponds to the GET /export - * - * @return String - */ - @Override - public String getState( final UUID uuid ) throws Exception { - - if ( uuid == null ) { - logger.error( "UUID passed in cannot be null." ); - return "UUID passed in cannot be null"; - } - - EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() ); - - //retrieve the export entity. - Export export = rootEm.get( uuid, Export.class ); - - if ( export == null ) { - logger.error( "no entity with that uuid was found" ); - return "No Such Element found"; - } - return export.getState().toString(); - } - - - @Override - public String getErrorMessage( final UUID appId, final UUID uuid ) throws Exception { - - //get application entity manager - if ( appId == null ) { - logger.error( "Application context cannot be found." ); - return "Application context cannot be found."; - } - - if ( uuid == null ) { - logger.error( "UUID passed in cannot be null." ); - return "UUID passed in cannot be null"; - } - - EntityManager rootEm = emf.getEntityManager( appId ); - - //retrieve the export entity. - Export export = rootEm.get( uuid, Export.class ); - - if ( export == null ) { - logger.error( "no entity with that uuid was found" ); - return "No Such Element found"; - } - return export.getErrorMessage(); - } - - - @Override - public void doExport( final JobExecution jobExecution ) throws Exception { - @SuppressWarnings("unchecked") - Map<String, Object> config = ( Map<String, Object> ) jobExecution.getJobData().getProperty( "exportInfo" ); - Object s3PlaceHolder = jobExecution.getJobData().getProperty( "s3Export" ); - S3Export s3Export; - - if ( config == null ) { - logger.error( "Export Information passed through is null" ); - return; - } - //get the entity manager for the application, and the entity that this Export corresponds to. - UUID exportId = ( UUID ) jobExecution.getJobData().getProperty( EXPORT_ID ); - - EntityManager em = emf.getEntityManager( emf.getManagementAppId() ); - Export export = em.get( exportId, Export.class ); - - //update the entity state to show that the job has officially started. - export.setState( Export.State.STARTED ); - em.update( export ); - try { - if ( s3PlaceHolder != null ) { - s3Export = ( S3Export ) s3PlaceHolder; - } - else { - s3Export = new S3ExportImpl(); - } - } - catch ( Exception e ) { - logger.error( "S3Export doesn't exist" ); - export.setErrorMessage( e.getMessage() ); - export.setState( Export.State.FAILED ); - em.update( export ); - return; - } - - if ( config.get( "organizationId" ) == null ) { - logger.error( "doExport: No organization could be found" ); - export.setState( Export.State.FAILED ); - em.update( export ); - return; - } - else if ( config.get( "applicationId" ) == null ) { - //exports All the applications from an organization - try { - exportApplicationsFromOrg( ( UUID ) config.get( "organizationId" ), config, jobExecution, s3Export ); - } - catch ( Exception e ) { - export.setErrorMessage( e.getMessage() ); - export.setState( Export.State.FAILED ); - em.update( export ); - return; - } - } - else if ( config.get( "collectionName" ) == null ) { - //exports an Application from a single organization - try { - exportApplicationFromOrg( ( UUID ) config.get( "organizationId" ), - ( UUID ) config.get( "applicationId" ), config, jobExecution, s3Export ); - } - catch ( Exception e ) { - export.setErrorMessage( e.getMessage() ); - export.setState( Export.State.FAILED ); - em.update( export ); - return; - } - } - else { - try { - //exports a single collection from an app org combo - try { - exportCollectionFromOrgApp( ( UUID ) config.get( "applicationId" ), config, jobExecution, - s3Export ); - } - catch ( Exception e ) { - export.setErrorMessage( e.getMessage() ); - export.setState( Export.State.FAILED ); - em.update( export ); - return; - } - } - catch ( Exception e ) { - //if for any reason the backing up fails, then update the entity with a failed state. - export.setErrorMessage( e.getMessage() ); - export.setState( Export.State.FAILED ); - em.update( export ); - return; - } - } - export.setState( Export.State.FINISHED ); - em.update( export ); - } - - - public SchedulerService getSch() { - return sch; - } - - - public void setSch( final SchedulerService sch ) { - this.sch = sch; - } - - - public EntityManagerFactory getEmf() { - return emf; - } - - - public void setEmf( final EntityManagerFactory emf ) { - this.emf = emf; - } - - - public ManagementService getManagementService() { - - return managementService; - } - - - public void setManagementService( final ManagementService managementService ) { - this.managementService = managementService; - } - - - public Export getExportEntity( final JobExecution jobExecution ) throws Exception { - - UUID exportId = ( UUID ) jobExecution.getJobData().getProperty( EXPORT_ID ); - EntityManager exportManager = emf.getEntityManager( emf.getManagementAppId() ); - - return exportManager.get( exportId, Export.class ); - } - - - /** - * Exports All Applications from an Organization - */ - private void exportApplicationsFromOrg( UUID organizationUUID, final Map<String, Object> config, - final JobExecution jobExecution, S3Export s3Export ) throws Exception { - - //retrieves export entity - Export export = getExportEntity( jobExecution ); - String appFileName = null; - - BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( organizationUUID ); - - for ( Map.Entry<UUID, String> application : applications.entrySet() ) { - - if ( application.getValue().equals( - managementService.getOrganizationByUuid( organizationUUID ).getName() + "/exports" ) ) { - continue; - } - - appFileName = prepareOutputFileName( application.getValue(), null ); - - File ephemeral = collectionExportAndQuery( application.getKey(), config, export, jobExecution ); - - fileTransfer( export, appFileName, ephemeral, config, s3Export ); - } - } - - - public void fileTransfer( Export export, String appFileName, File ephemeral, Map<String, Object> config, - S3Export s3Export ) { - try { - s3Export.copyToS3( ephemeral, config, appFileName ); - - } - catch ( Exception e ) { - export.setErrorMessage( e.getMessage() ); - export.setState( Export.State.FAILED ); - return; - } - } - - - /** - * Exports a specific applications from an organization - */ - private void exportApplicationFromOrg( UUID organizationUUID, UUID applicationId, final Map<String, Object> config, - final JobExecution jobExecution, S3Export s3Export ) throws Exception { - - //retrieves export entity - Export export = getExportEntity( jobExecution ); - - ApplicationInfo application = managementService.getApplicationInfo( applicationId ); - String appFileName = prepareOutputFileName( application.getName(), null ); - - File ephemeral = collectionExportAndQuery(applicationId, config, export, jobExecution); - - fileTransfer( export, appFileName, ephemeral, config, s3Export ); - } - - - /** - * Exports a specific collection from an org-app combo. - */ - //might be confusing, but uses the /s/ inclusion or exclusion nomenclature. - private void exportCollectionFromOrgApp( UUID applicationUUID, final Map<String, Object> config, - final JobExecution jobExecution, S3Export s3Export ) throws Exception { - - //retrieves export entity - Export export = getExportEntity( jobExecution ); - ApplicationInfo application = managementService.getApplicationInfo( applicationUUID ); - - String appFileName = prepareOutputFileName( application.getName(), ( String ) config.get( "collectionName" ) ); - - - File ephemeral = collectionExportAndQuery( applicationUUID, config, export, jobExecution ); - - fileTransfer( export, appFileName, ephemeral, config, s3Export ); - } - - - /** - * Regulates how long to wait until the next heartbeat. - */ - public long checkTimeDelta( long startingTime, final JobExecution jobExecution ) { - - long cur_time = System.currentTimeMillis(); - - if ( startingTime <= ( cur_time - TIMESTAMP_DELTA ) ) { - jobExecution.heartbeat(); - return cur_time; - } - return startingTime; - } - - - /** - * Serialize and save the collection members of this <code>entity</code> - * - * @param em Entity Manager - * @param collection Collection Name - * @param entity entity - */ - private void saveCollectionMembers( JsonGenerator jg, EntityManager em, String collection, Entity entity ) - throws Exception { - - // Write connections - saveConnections( entity, em, jg ); - // Write dictionaries - saveDictionaries( entity, em, jg ); - - Set<String> collections = em.getCollections( entity ); - - // If your application doesn't have any e - if ( ( collections == null ) || collections.isEmpty() ) { - return; - } - - for ( String collectionName : collections ) { - - if ( collectionName.equals( collection ) ) { - jg.writeFieldName( collectionName ); - jg.writeStartArray(); - - //is 100000 an arbitary number? - Results collectionMembers = - em.getCollection( entity, collectionName, null, 100000, Level.IDS, false ); - - List<UUID> entityIds = collectionMembers.getIds(); - - if ( ( entityIds != null ) && !entityIds.isEmpty() ) { - for ( UUID childEntityUUID : entityIds ) { - jg.writeObject( childEntityUUID.toString() ); - } - } - - // End collection array. - jg.writeEndArray(); - } - } - } - - - /** - * Persists the connection for this entity. - */ - private void saveDictionaries( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception { - - jg.writeFieldName( "dictionaries" ); - jg.writeStartObject(); - - Set<String> dictionaries = em.getDictionaries( entity ); - for ( String dictionary : dictionaries ) { - - Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary ); - - // nothing to do - if ( dict.isEmpty() ) { - continue; - } - - jg.writeFieldName( dictionary ); - - jg.writeStartObject(); - - for ( Map.Entry<Object, Object> entry : dict.entrySet() ) { - jg.writeFieldName( entry.getKey().toString() ); - jg.writeObject( entry.getValue() ); - } - - jg.writeEndObject(); - } - jg.writeEndObject(); - } - - - /** - * Persists the connection for this entity. - */ - private void saveConnections( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception { - - jg.writeFieldName( "connections" ); - jg.writeStartObject(); - - Set<String> connectionTypes = em.getConnectionTypes( entity ); - for ( String connectionType : connectionTypes ) { - - jg.writeFieldName( connectionType ); - jg.writeStartArray(); - - Results results = em.getTargetEntities( - new SimpleEntityRef(entity.getType(), entity.getUuid()), - connectionType, null, Level.IDS); - - List<ConnectionRef> connections = results.getConnections(); - - for ( ConnectionRef connectionRef : connections ) { - jg.writeObject( connectionRef.getTargetRefs().getUuid() ); - } - - jg.writeEndArray(); - } - jg.writeEndObject(); - } - - - protected JsonGenerator getJsonGenerator( File ephermal ) throws IOException { - //TODO:shouldn't the below be UTF-16? - - JsonGenerator jg = jsonFactory.createJsonGenerator( ephermal, JsonEncoding.UTF8 ); - jg.setPrettyPrinter( new DefaultPrettyPrinter( ) ); - jg.setCodec( new ObjectMapper() ); - return jg; - } - - - /** - * @return the file name concatenated with the type and the name of the collection - */ - public String prepareOutputFileName( String applicationName, String CollectionName ) { - StringBuilder str = new StringBuilder(); - str.append( applicationName ); - str.append( "." ); - if ( CollectionName != null ) { - str.append( CollectionName ); - str.append( "." ); - } - str.append( System.currentTimeMillis() ); - str.append( ".json" ); - - String outputFileName = str.toString(); - - return outputFileName; - } - - - /** - * handles the query and export of collections - */ - //TODO:Needs further refactoring. - protected File collectionExportAndQuery( UUID applicationUUID, final Map<String, Object> config, Export export, - final JobExecution jobExecution ) throws Exception { - - EntityManager em = emf.getEntityManager( applicationUUID ); - Map<String, Object> metadata = em.getApplicationCollectionMetadata(); - long starting_time = System.currentTimeMillis(); - File ephemeral = new File( "tempExport" + UUID.randomUUID() ); - ephemeral.deleteOnExit(); - - - JsonGenerator jg = getJsonGenerator( ephemeral ); - - jg.writeStartObject(); - jg.writeObjectFieldStart( "collections" ); - - for ( String collectionName : metadata.keySet() ) { - - if ( collectionName.equals( "exports" ) ) { - continue; - } - //if the collection you are looping through doesn't match the name of the one you want. Don't export it. - if ( ( config.get( "collectionName" ) == null ) || collectionName.equalsIgnoreCase((String)config.get( "collectionName" ) ) ) { - - //write out the collection name at the start of the file - jg.writeArrayFieldStart( collectionName.toLowerCase() ); - - //Query entity manager for the entities in a collection - Query query = null; - if ( config.get( "query" ) == null ) { - query = new Query(); - } - else { - try { - query = Query.fromQL( ( String ) config.get( "query" ) ); - } - catch ( Exception e ) { - export.setErrorMessage( e.getMessage() ); - } - } - query.setLimit( MAX_ENTITY_FETCH ); - query.setResultsLevel( Level.ALL_PROPERTIES ); - query.setCollection( collectionName ); - - Results entities = em.searchCollection( em.getApplicationRef(), collectionName, query ); - - //pages through the query and backs up all results. - PagingResultsIterator itr = new PagingResultsIterator( entities ); - for ( Object e : itr ) { - starting_time = checkTimeDelta( starting_time, jobExecution ); - Entity entity = ( Entity ) e; - jg.writeStartObject(); - jg.writeFieldName( "Metadata" ); - jg.writeObject( entity ); - saveCollectionMembers( jg, em, ( String ) config.get( "collectionName" ), entity ); - jg.writeEndObject(); - jg.flush(); - - } - - - - //write out the end collection - jg.writeEndArray(); - } - } - jg.writeEndObject(); - jg.writeEndObject(); - jg.flush(); - jg.close(); - - return ephemeral; - } -}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/cb1356c2/stack/services/src/main/java/org/apache/usergrid/management/export/S3Export.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/S3Export.java b/stack/services/src/main/java/org/apache/usergrid/management/export/S3Export.java deleted file mode 100644 index 6adfa77..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/export/S3Export.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.management.export; - - -import java.io.File; -import java.util.Map; - - -public interface S3Export { - void copyToS3( File ephemeral,Map<String,Object> exportInfo, String filename ); -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/cb1356c2/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java deleted file mode 100644 index 824aca9..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.management.export; - - -import com.google.common.collect.ImmutableSet; -import com.google.common.hash.Hashing; -import com.google.common.io.Files; -import com.google.inject.Module; -import org.jclouds.ContextBuilder; -import org.jclouds.blobstore.BlobStore; -import org.jclouds.blobstore.BlobStoreContext; -import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.domain.BlobBuilder; -import org.jclouds.blobstore.options.PutOptions; -import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule; -import org.jclouds.logging.log4j.config.Log4JLoggingModule; -import org.jclouds.netty.config.NettyPayloadModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Map; -import java.util.Properties; - - -public class S3ExportImpl implements S3Export { - Logger logger = LoggerFactory.getLogger( S3ExportImpl.class ); - - @Override - public void copyToS3( File ephemeral, final Map<String,Object> exportInfo, String filename ) { - - /*won't need any of the properties as I have the export info*/ - Map<String,Object> properties = ( Map<String, Object> ) exportInfo.get( "properties" ); - - Map<String, Object> storage_info = (Map<String,Object>)properties.get( "storage_info" ); - - String bucketName = ( String ) storage_info.get( "bucket_location" ); - String accessId = ( String ) storage_info.get( "s3_access_id"); - String secretKey = ( String ) storage_info.get( "s3_key" ); - - Properties overrides = new Properties(); - overrides.setProperty( "s3" + ".identity", accessId ); - overrides.setProperty( "s3" + ".credential", secretKey ); - - final Iterable<? extends Module> MODULES = ImmutableSet - .of( new JavaUrlHttpCommandExecutorServiceModule(), new Log4JLoggingModule(), - new NettyPayloadModule() ); - - BlobStoreContext context = ContextBuilder.newBuilder( "s3" ) - .credentials(accessId, secretKey) - .modules(MODULES) - .overrides(overrides) - .buildView(BlobStoreContext.class); - - // Create Container (the bucket in s3) - try { - BlobStore blobStore = context.getBlobStore(); - if ( blobStore.createContainerInLocation(null, bucketName) ) { - logger.info( "Created bucket {}", bucketName ); - } - } - catch ( Exception ex ) { - logger.error( "Could not start binary service: {}", ex.getMessage() ); - return; - } - - try { - BlobStore blobStore = context.getBlobStore(); - - // need this for JClouds 1.7.x: -// BlobBuilder blobBuilder = -// blobStore.blobBuilder( filename ).payload( ephemeral ).calculateMD5().contentType( "application/json" ); - - // needed for JClouds 1.8.x: - BlobBuilder blobBuilder = blobStore.blobBuilder( filename ) - .payload( ephemeral ) - .contentMD5(Files.hash( ephemeral, Hashing.md5() )) - .contentType("application/json"); - - Blob blob = blobBuilder.build(); - - final String uploadedFile = blobStore.putBlob( - bucketName, blob, PutOptions.Builder.multipart() ); - - logger.info("Uploaded file name={} etag={}", filename, uploadedFile ); - } - catch ( Exception e ) { - logger.error( "Error uploading to blob store", e ); - } - } - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/cb1356c2/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java deleted file mode 100644 index be0b943..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.management.importer; - -import org.apache.usergrid.batch.JobExecution; -import org.apache.usergrid.batch.job.OnlyOnceJob; -import org.apache.usergrid.persistence.Entity; -import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityManagerFactory; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.entities.FileImport; -import org.apache.usergrid.persistence.entities.Import; -import org.apache.usergrid.persistence.entities.JobData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import java.util.List; -import java.util.UUID; -import org.apache.usergrid.persistence.Query.Level; - - -@Component("fileImportJob") -public class FileImportJob extends OnlyOnceJob { - - public static final String FILE_IMPORT_ID = "fileImportId"; - private static final Logger logger = LoggerFactory.getLogger(FileImportJob.class); - - @Autowired - EntityManagerFactory emf; - - @Autowired - ImportService importService; - - public FileImportJob() { - if (logger.isTraceEnabled()) { - logger.info("FileImportJob created"); - } - } - - @Override - protected void doJob(JobExecution jobExecution) throws Exception { - logger.info("execute FileImportJob {}", jobExecution.toString()); - - try { - JobData jobData = jobExecution.getJobData(); - if (jobData == null) { - logger.error("jobData cannot be null"); - return; - } - - // heartbeat to indicate job has started - jobExecution.heartbeat(); - - // call the File Parser for the file set in job execution - importService.downloadAndImportFile(jobExecution); - - } catch ( Throwable t ) { - logger.debug("Error importing file", t); - - // update file import record - UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID); - EntityManager em = emf.getEntityManager(emf.getManagementAppId()); - FileImport fileImport = em.get(fileImportId, FileImport.class); - fileImport.setState( FileImport.State.FAILED ); - em.update( fileImport ); - - throw t; - } - - logger.info("File Import Service completed job: {}", jobExecution.getJobName() ); - } - - @Override - protected long getDelay(JobExecution execution) throws Exception { - return 100; - } - - @Autowired - public void setImportService( final ImportService importService ) { - this.importService = importService; - } - - /** - * This method is called when the job is retried maximum times by the scheduler but still fails. - * Thus the scheduler marks it as DEAD. - */ - @Override - public void dead( final JobExecution execution ) throws Exception { - - // Get the root entity manager - EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() ); - - // Mark the sub-job i.e. File Import Job as Failed - FileImport fileImport = null;//importService.getFileImportEntity(execution); - fileImport.setErrorMessage("The Job has been tried maximum times but still failed"); - fileImport.setState(FileImport.State.FAILED); - rootEm.update(fileImport); - - // If one file Job fails, mark the main import Job also as failed - Results ImportJobResults = rootEm.getSourceEntities( - fileImport, "includes", null, Level.ALL_PROPERTIES); - List<Entity> importEntity = ImportJobResults.getEntities(); - UUID importId = importEntity.get(0).getUuid(); - Import importUG = rootEm.get(importId, Import.class); - importUG.setState(Import.State.FAILED); - rootEm.update(importUG); - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/cb1356c2/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java deleted file mode 100644 index e1170c3..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.management.importer; - - -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityManagerFactory; -import org.apache.usergrid.persistence.entities.FailedImportConnection; -import org.apache.usergrid.persistence.entities.FailedImportEntity; -import org.apache.usergrid.persistence.entities.FileImport; -import org.apache.usergrid.persistence.exceptions.PersistenceException; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - - -/** - * Statistics used to track a file import. Only 1 instance of this class should exist - * per file imported in the cluster. There is a direct 1-1 mapping of the statistics provided - * here and the file import status. This class is thread-safe to be used across multiple threads. - */ -public class FileImportTracker { - - private static final String ERROR_MESSAGE = - "Failed to import some data. See the import counters and errors."; - - /** - * Connection name to log individual errors - */ - public static final String ERRORS_CONNECTION_NAME = "errors"; - - private final AtomicLong entitiesWritten = new AtomicLong( 0 ); - private final AtomicLong entitiesFailed = new AtomicLong( 0 ); - private final AtomicLong connectionsWritten = new AtomicLong( 0 ); - private final AtomicLong connectionsFailed = new AtomicLong( 0 ); - private final AtomicInteger cachedOperations = new AtomicInteger( 0 ); - - private final Semaphore writeSemaphore = new Semaphore( 1 ); - - private final FileImport fileImport; - private final EntityManagerFactory emf; - private final int flushCount; - - - /** - * Create an instance to track counters. Note that when this instance is created, it will - * attempt to load it's state from the entity manager. In the case of using this when resuming, - * be sure you begin processing where the system thinks * it has left off. - * - * @param emf Entity Manager Factory - * @param fileImport File Import Entity - * @param flushCount The number of success + failures to accumulate before flushing - */ - public FileImportTracker( - final EntityManagerFactory emf, final FileImport fileImport, final int flushCount ) { - - this.emf = emf; - this.flushCount = flushCount; - this.fileImport = fileImport; - - this.entitiesWritten.addAndGet( fileImport.getImportedEntityCount() ); - this.entitiesFailed.addAndGet( fileImport.getFailedEntityCount() ); - - this.connectionsWritten.addAndGet( fileImport.getImportedConnectionCount() ); - this.connectionsFailed.addAndGet( fileImport.getFailedConnectionCount() ); - } - - - /** - * Invoke when an entity has been successfully written - */ - public void entityWritten() { - entitiesWritten.incrementAndGet(); - maybeFlush(); - } - - - /** - * Invoke when an entity fails to write correctly - */ - - public void entityFailed( final String message ) { - entitiesFailed.incrementAndGet(); - - FailedImportEntity failedImportEntity = new FailedImportEntity(); - failedImportEntity.setErrorMessage( message ); - - try { - EntityManager entityManager = emf.getEntityManager(emf.getManagementAppId()); - failedImportEntity = entityManager.create( failedImportEntity ); - entityManager.createConnection( fileImport, ERRORS_CONNECTION_NAME, failedImportEntity ); - } - catch ( Exception e ) { - throw new PersistenceException( "Unable to save failed entity import message", e ); - } - maybeFlush(); - } - - - /** - * Invoked when a connection is written - */ - public void connectionWritten() { - connectionsWritten.incrementAndGet(); - maybeFlush(); - } - - - /** - * Invoked when a connection cannot be written - */ - public void connectionFailed( final String message ) { - connectionsFailed.incrementAndGet(); - - - FailedImportConnection failedImportConnection = new FailedImportConnection(); - failedImportConnection.setErrorMessage( message ); - - try { - EntityManager entityManager = emf.getEntityManager(emf.getManagementAppId()); - failedImportConnection = entityManager.create( failedImportConnection ); - entityManager.createConnection( fileImport, ERRORS_CONNECTION_NAME, failedImportConnection ); - } - catch ( Exception e ) { - throw new PersistenceException( "Unable to save failed entity import message", e ); - } - maybeFlush(); - } - - - /** - * Invoke when the file is completed processing - */ - public void complete() { - - final long failed = entitiesFailed.get() + connectionsFailed.get(); - - final FileImport.State state; - final String message; - - if ( failed > 0 ) { - state = FileImport.State.FAILED; - message = fileImport.getErrorMessage() == null ? ERROR_MESSAGE : fileImport.getErrorMessage(); - } - else { - state = FileImport.State.FINISHED; - message = null; - } - - updateFileImport( state, message ); - } - - - /** - * Invoke when we halt the import with a fatal error that cannot be recovered. - */ - public void fatal( final String message ) { - - updateFileImport( FileImport.State.FAILED, message ); - } - - - /** - * Return the total number of successful imports + failed imports. - * Can be used in resume. Note that this reflects the counts last written - * to cassandra when this instance was created + any processing - */ - public long getTotalEntityCount() { - return getEntitiesWritten() + getEntitiesFailed(); - } - - - /** - * Get the total number of failed + successful connections - * @return - */ - public long getTotalConnectionCount(){ - return getConnectionsFailed() + getConnectionsWritten(); - } - - - /** - * Returns true if we should stop processing. We use fail fast logic, so after the first - * failure this will return true. - */ - public boolean shouldStopProcessingEntities() { - return entitiesFailed.get() > 0; - } - - - /** - * Returns true if we should stop processing. We use fail fast logic, so after the first - * failure this will return true. - */ - public boolean shouldStopProcessingConnections() { - return connectionsFailed.get() > 0; - } - - /** - * Get the number of entities written - * @return - */ - public long getEntitiesWritten() { - return entitiesWritten.get(); - } - - - /** - * Get the number of failed entities - * @return - */ - public long getEntitiesFailed() { - return entitiesFailed.get(); - } - - - /** - * Get the number of connections written - * @return - */ - public long getConnectionsWritten() { - return connectionsWritten.get(); - } - - - /** - * Get the number of connections failed - * @return - */ - public long getConnectionsFailed() { - return connectionsFailed.get(); - } - - private void maybeFlush() { - final int count = cachedOperations.incrementAndGet(); - - //no op - if ( count < flushCount ) { - return; - } - - //another thread is writing, no op, just return - if ( !writeSemaphore.tryAcquire() ) { - return; - } - - final long failed = entitiesFailed.get(); - final long written = entitiesWritten.get(); - final String message; - - if ( failed > 0 ) { - message = "Failed to import " + failed - + " entities. Successfully imported " + written + " entities"; - } - else { - message = "Successfully imported " + written + " entities"; - } - - updateFileImport( FileImport.State.STARTED, message ); - cachedOperations.addAndGet( flushCount * -1 ); - writeSemaphore.release(); - } - - - /** - * Update the file import status with the provided messages - * - * @param state The state to set into the import - * @param message The message to set - */ - private void updateFileImport( final FileImport.State state, final String message ) { - - try { - - - final long writtenEntities = entitiesWritten.get(); - final long failedEntities = entitiesFailed.get(); - - final long writtenConnections = connectionsWritten.get(); - final long failedConnections = connectionsFailed.get(); - - - fileImport.setImportedEntityCount( writtenEntities ); - fileImport.setFailedEntityCount( failedEntities ); - - fileImport.setImportedConnectionCount( writtenConnections ); - fileImport.setFailedConnectionCount( failedConnections ); - - - fileImport.setState( state ); - fileImport.setErrorMessage( message ); - - EntityManager entityManager = emf.getEntityManager(emf.getManagementAppId()); - entityManager.update( fileImport ); - } - catch ( Exception e ) { - throw new RuntimeException( "Unable to persist complete state", e ); - } - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/cb1356c2/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java deleted file mode 100644 index 935f12c..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.management.importer; - -import org.apache.usergrid.batch.JobExecution; -import org.apache.usergrid.batch.job.OnlyOnceJob; - -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityManagerFactory; -import org.apache.usergrid.persistence.entities.FileImport; -import org.apache.usergrid.persistence.entities.Import; -import org.apache.usergrid.persistence.entities.JobData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.UUID; - - -@Component("importJob") -public class ImportJob extends OnlyOnceJob { - - public static final String IMPORT_ID = "importId"; - private static final Logger logger = LoggerFactory.getLogger(ImportJob.class); - - @Autowired - protected EntityManagerFactory emf; - - @Autowired - ImportService importService; - - public ImportJob(){ - if (logger.isTraceEnabled()) { - logger.info("ImportJob created"); - } - } - - @Override - protected void doJob(JobExecution jobExecution) throws Exception { - logger.info( "execute ImportJob {}", jobExecution.getJobId().toString() ); - - try { - JobData jobData = jobExecution.getJobData(); - if (jobData == null) { - logger.error("jobData cannot be null"); - return; - } - - // heartbeat to indicate job has started - jobExecution.heartbeat(); - - // call the doImport method from import service which - // schedules the sub-jobs i.e. parsing of files to FileImport Job - importService.doImport(jobExecution); - - } catch ( Throwable t ) { - logger.error("Error calling in importJob", t); - - // update import job record - UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID); - EntityManager mgmtApp = emf.getEntityManager(emf.getManagementAppId()); - Import importEntity = mgmtApp.get(importId, Import.class); - importEntity.setState(Import.State.FAILED); - importEntity.setErrorMessage(t.getMessage()); - mgmtApp.update(importEntity); - - throw t; - } - - if (logger.isTraceEnabled()) { - logger.trace("Import Service completed job"); - } - } - - @Override - protected long getDelay(JobExecution execution) throws Exception { - return 100; - } - - @Autowired - public void setImportService( final ImportService importService ) { - this.importService = importService; - } - - - /** - * This method is called when the job is retried maximum times by the - * scheduler but still fails. Thus the scheduler marks it as DEAD. - */ - @Override - public void dead( final JobExecution execution ) throws Exception { - - // marks the job as failed as it will not be retried by the scheduler. - EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId()); - Import importUG = importService.getImportEntity(execution); - importUG.setErrorMessage("The Job has been tried maximum times but still failed"); - importUG.setState(Import.State.FAILED); - rootEm.update(importUG); - - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/cb1356c2/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java deleted file mode 100644 index 0fd4092..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.management.importer; - - -import org.apache.usergrid.batch.JobExecution; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.entities.FailedImportEntity; -import org.apache.usergrid.persistence.entities.FileImport; -import org.apache.usergrid.persistence.entities.Import; - -import java.util.Map; -import java.util.UUID; - - -/** - * Performs all functions related to importing. - */ -public interface ImportService { - - /** - * Schedules the import to execute - */ - Import schedule( final UUID applicationId, Map<String, Object> json ) throws Exception; - - /** - * Get the imports results for the application - * @param applicationId - * @param ql The query executed (nullable) - * @param cursor The cursor passed (nullable) - * @return - */ - Results getImports(final UUID applicationId, final String ql, final String cursor); - - /** - * Get the import - * @param applicationId - * @param importId - * @return - */ - Import getImport(final UUID applicationId, final UUID importId); - - /** - * Get the results - * - * @param applicationId The applicationId - * @param importId The import id to get files from - * @param ql The query executed (nullable) - * @param cursor The cursor passed (nullable) - */ - Results getFileImports(final UUID applicationId, final UUID importId, final String ql, final String cursor); - - /** - * Get the results - * - * @param applicationId The applicationId - * @param importId The import id to get files from - * - * @return The FileImport - */ - FileImport getFileImport(final UUID applicationId, final UUID importId, final UUID fileImportId); - - - /** - * Get the results of failed imports - * - * - * @param applicationId The applicationId - * @param importId The import id to get files from - * @param ql The query executed (nullable) - * @param cursor The cursor passed (nullable) - */ - Results getFailedImportEntities(final UUID applicationId, final UUID importId, final UUID fileImportId, final String ql, final String cursor); - - /** - * Get the failedimport entity from it's parentId - * @param applicationId - * @param importId - * @param fileImportId - * @param failedImportId - * @return - */ - FailedImportEntity getFailedImportEntity(final UUID applicationId, final UUID importId, final UUID fileImportId, final UUID failedImportId); - - /** - * Perform the import from the external resource - */ - void doImport(JobExecution jobExecution) throws Exception; - - /** - * Parses the input file and creates entities - */ - void downloadAndImportFile(JobExecution jobExecution) throws Exception; - - /** - * Get the state for the Job with UUID - * @param uuid Job UUID - * @return State of Job - */ - Import.State getState( UUID uuid ) throws Exception; - - /** - * Returns error message for the job with UUID - * @param uuid Job UUID - */ - String getErrorMessage(UUID uuid) throws Exception; - - /** - * @return FileImportEntity - */ - FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception; - - /** - * @param jobExecution - * @return ImportEntity - */ - Import getImportEntity(final JobExecution jobExecution) throws Exception; -}
