Use ExecutorService to limit number of threads used for Uploads (default is 40) and add property to set max uploaded file size (default is 50mb).
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/49ae4ac5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/49ae4ac5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/49ae4ac5 Branch: refs/heads/two-dot-o-dev Commit: 49ae4ac5b8d5d77e90e6e6c6e9d8b299a5423863 Parents: 6f90eba Author: Dave Johnson <dmjohn...@apigee.com> Authored: Tue May 19 10:32:42 2015 -0400 Committer: Dave Johnson <dmjohn...@apigee.com> Committed: Tue May 19 10:32:42 2015 -0400 ---------------------------------------------------------------------- .../main/resources/usergrid-default.properties | 2 + .../applications/assets/AssetResourceIT.java | 68 +++++- .../src/test/resources/cat-larger-than-6mb.jpg | Bin 0 -> 9799257 bytes .../services/assets/data/S3BinaryStore.java | 215 +++++++++++++------ 4 files changed, 216 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49ae4ac5/stack/config/src/main/resources/usergrid-default.properties ---------------------------------------------------------------------- diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties index d653b7e..0a4f218 100644 --- a/stack/config/src/main/resources/usergrid-default.properties +++ b/stack/config/src/main/resources/usergrid-default.properties @@ -160,6 +160,8 @@ swagger.basepath=http://localhost:8080 AWS_ACCESS_KEY_ID= AWS_SECRET_KEY= usergrid.binary.bucketname=usergrid-test +usergrid.binary.max-size-mb=50 +usergrid.binary.upload-workers=40 usergrid.test.sample_data_url= http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49ae4ac5/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java index 666f95e..c3cab63 100644 --- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java +++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java @@ -19,6 +19,7 @@ package org.apache.usergrid.rest.applications.assets; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -40,10 +41,9 @@ import org.apache.commons.io.IOUtils; import com.sun.jersey.multipart.FormDataMultiPart; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; +import static org.apache.usergrid.management.AccountCreationProps.PROPERTIES_ADMIN_USERS_REQUIRE_CONFIRMATION; import static org.apache.usergrid.utils.MapUtils.hashMap; +import static org.junit.Assert.*; @Concurrent() @@ -249,12 +249,11 @@ public class AssetResourceIT extends AbstractRestIT { node = resource().path( "/test-organization/test-app/foos/" + uuid ).queryParam( "access_token", access_token ) .accept( MediaType.APPLICATION_JSON ).type( MediaType.MULTIPART_FORM_DATA ).put( JsonNode.class, form ); logNode( node ); - Assert.assertTrue( lastModified != node.findValue( AssetUtils.LAST_MODIFIED ).getLongValue() ); + assertTrue( lastModified != node.findValue( AssetUtils.LAST_MODIFIED ).getLongValue() ); } @Test - @Ignore("Just enable and run when testing S3 large file upload specifically") public void largeFileInS3() throws Exception { UserRepo.INSTANCE.load( resource(), access_token ); @@ -302,6 +301,63 @@ public class AssetResourceIT extends AbstractRestIT { .accept( MediaType.APPLICATION_JSON_TYPE ).delete( JsonNode.class ); } + @Test + public void fileTooLargeShouldResultInError() throws Exception { + + Map<String, String> props = new HashMap<String, String>(); + props.put( "usergrid.binary.max-size-mb", "6" ); + resource().path( "/testproperties" ) + .queryParam( "access_token", access_token ) + .accept( MediaType.APPLICATION_JSON ) + .type( MediaType.APPLICATION_JSON_TYPE ).post( props ); + + try { + + UserRepo.INSTANCE.load( resource(), access_token ); + + byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/cat-larger-than-6mb.jpg" ) ); + FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE ); + + // send data + JsonNode node = resource().path( "/test-organization/test-app/bars" ).queryParam( "access_token", access_token ) + .accept( MediaType.APPLICATION_JSON ).type( MediaType.MULTIPART_FORM_DATA ) + .post( JsonNode.class, form ); + //logNode( node ); + JsonNode idNode = node.get( "entities" ).get( 0 ).get( "uuid" ); + String uuid = idNode.getTextValue(); + + // get entity + String errorMessage = null; + long timeout = System.currentTimeMillis() + 60000; + while (true) { + LOG.info( "Waiting for upload to finish..." ); + Thread.sleep( 2000 ); + node = resource().path( "/test-organization/test-app/bars/" + uuid ) + .queryParam( "access_token", access_token ).accept( MediaType.APPLICATION_JSON_TYPE ) + .get( JsonNode.class ); + //logNode( node ); + + // poll for the error to happen + if (node.findValue( "error" ) != null) { + errorMessage = node.findValue("error").asText(); + break; + } + if (System.currentTimeMillis() > timeout) { + throw new TimeoutException(); + } + } + + assertTrue( errorMessage.startsWith("Asset size ")); + + } finally { + props = new HashMap<String, String>(); + props.put( "usergrid.binary.max-size-mb", "25" ); + resource().path( "/testproperties" ) + .queryParam( "access_token", access_token ) + .accept( MediaType.APPLICATION_JSON ) + .type( MediaType.APPLICATION_JSON_TYPE ).post( props ); + } + } /** * Deleting a connection to an asset should not delete the asset or the asset's data @@ -317,7 +373,7 @@ public class AssetResourceIT extends AbstractRestIT { Map<String, String> payload = hashMap("name", "cassandra_eye.jpg"); - JsonNode node = resource().path("/test-organization/test-app/foos") + JsonNode node = resource().path("/test-organization/test-app/bars") .queryParam("access_token", access_token) .accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON_TYPE) http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49ae4ac5/stack/rest/src/test/resources/cat-larger-than-6mb.jpg ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/resources/cat-larger-than-6mb.jpg b/stack/rest/src/test/resources/cat-larger-than-6mb.jpg new file mode 100644 index 0000000..d45435a Binary files /dev/null and b/stack/rest/src/test/resources/cat-larger-than-6mb.jpg differ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49ae4ac5/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java index 29b5e47..f59e79c 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java @@ -25,10 +25,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Map; +import java.util.Properties; +import java.util.Stack; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import org.apache.usergrid.utils.StringUtils; import org.jclouds.ContextBuilder; import org.jclouds.blobstore.AsyncBlobStore; import org.jclouds.blobstore.BlobStore; @@ -63,12 +65,16 @@ public class S3BinaryStore implements BinaryStore { private static final Logger LOG = LoggerFactory.getLogger( S3BinaryStore.class ); private static final long FIVE_MB = ( FileUtils.ONE_MB * 5 ); + private static String WORKERS_PROP_NAME = "usergrid.binary.upload-workers"; private BlobStoreContext context; private String accessId; private String secretKey; private String bucketName; - private ExecutorService executor = Executors.newFixedThreadPool( 10 ); + private ExecutorService executorService; + + @Autowired + private Properties properties; @Autowired private EntityManagerFactory emf; @@ -104,17 +110,19 @@ public class S3BinaryStore implements BinaryStore { @Override public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws IOException { - final String uploadFileName = AssetUtils.buildAssetKey( appId, entity ); + // write up to 5mb of data to an byte array + ByteArrayOutputStream baos = new ByteArrayOutputStream(); long written = IOUtils.copyLarge( inputStream, baos, 0, FIVE_MB ); byte[] data = baos.toByteArray(); - final Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity ); - fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() ); + if ( written < FIVE_MB ) { // total smaller than 5mb - final String mimeType = AssetMimeHandler.get().getMimeType( entity, data ); + final String uploadFileName = AssetUtils.buildAssetKey( appId, entity ); + final String mimeType = AssetMimeHandler.get().getMimeType( entity, data ); - if ( written < FIVE_MB ) { // total smaller than 5mb + final Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity ); + fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() ); BlobStore blobStore = getContext().getBlobStore(); BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName ) @@ -134,69 +142,31 @@ public class S3BinaryStore implements BinaryStore { } else { // bigger than 5mb... dump 5 mb tmp files and upload from them - // create temp file and copy entire file to that temp file - - LOG.debug( "Writing temp file for S3 upload" ); - - final File tempFile = File.createTempFile( entity.getUuid().toString(), "tmp" ); - tempFile.deleteOnExit(); - OutputStream os = null; - try { - os = new BufferedOutputStream( new FileOutputStream( tempFile.getAbsolutePath() ) ); - os.write( data ); - written += IOUtils.copyLarge( inputStream, os, 0, ( FileUtils.ONE_GB * 5 ) ); - } - finally { - IOUtils.closeQuietly( os ); - } - - fileMetadata.put( AssetUtils.CONTENT_LENGTH, written ); - - // JClouds no longer supports async blob store, so we have to do this fun stuff - - LOG.debug( "Starting upload thread" ); - - Thread uploadThread = new Thread( new Runnable() { - @Override - public void run() { - try { - LOG.debug( "S3 upload thread started" ); + ExecutorService executors = getExecutorService(); - BlobStore blobStore = getContext().getBlobStore(); - - BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName ) - .payload( tempFile ).calculateMD5().contentType( mimeType ); - - if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) { - bb.contentDisposition( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ).toString() ); - } - final Blob blob = bb.build(); + executors.submit( new UploadWorker( appId, entity, inputStream, data, written ) ); + } + } - String md5sum = Hex.encodeHexString( blob.getMetadata().getContentMetadata().getContentMD5() ); - fileMetadata.put( AssetUtils.CHECKSUM, md5sum ); - LOG.debug( "S3 upload starting" ); + private ExecutorService getExecutorService() { - String eTag = blobStore.putBlob( bucketName, blob ); - fileMetadata.put( AssetUtils.E_TAG, eTag ); + if ( executorService == null ) { + synchronized (this) { - LOG.debug( "S3 upload complete eTag=" + eTag); + int workers = 40; + String workersString = properties.getProperty( WORKERS_PROP_NAME, "40"); - EntityManager em = emf.getEntityManager( appId ); - em.update( entity ); - tempFile.delete(); - } - catch ( Exception e ) { - LOG.error( "error uploading", e ); - } - if ( tempFile != null && tempFile.exists() ) { - tempFile.delete(); - } + if ( StringUtils.isNumeric( workersString ) ) { + workers = Integer.parseInt( workersString ); + } else if ( !StringUtils.isEmpty( workersString )) { + LOG.error("Ignoring invalid setting for {}", WORKERS_PROP_NAME); } - }); - - uploadThread.start(); + executorService = Executors.newFixedThreadPool( workers ); + } } + + return executorService; } @@ -229,5 +199,124 @@ public class S3BinaryStore implements BinaryStore { BlobStore blobStore = getContext().getBlobStore(); blobStore.removeBlob( bucketName, AssetUtils.buildAssetKey( appId, entity ) ); } + + class UploadWorker implements Callable<Void> { + + private UUID appId; + private Entity entity; + private InputStream inputStream; + private byte[] data; + private long written; + + + public UploadWorker( UUID appId, Entity entity, InputStream is, byte[] data, long written ) { + this.appId = appId; + this.entity = entity; + this.inputStream = is; + this.data = data; + this.written = written; + } + + @Override + public Void call() { + + LOG.debug( "Writing temp file for S3 upload" ); + + // determine max size file allowed, default to 50mb + long maxSizeBytes = 50 * FileUtils.ONE_MB; + String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" ); + if (StringUtils.isNumeric( maxSizeMbString )) { + maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB; + } + + // always allow files up to 5mb + if (maxSizeBytes < 5 * FileUtils.ONE_MB ) { + maxSizeBytes = 5 * FileUtils.ONE_MB; + } + + // write temporary file, slightly larger than our size limit + OutputStream os = null; + File tempFile; + try { + tempFile = File.createTempFile( entity.getUuid().toString(), "tmp" ); + tempFile.deleteOnExit(); + os = new BufferedOutputStream( new FileOutputStream( tempFile.getAbsolutePath() ) ); + os.write( data ); + written += IOUtils.copyLarge( inputStream, os, 0, maxSizeBytes + 1 ); + + } catch ( IOException e ) { + throw new RuntimeException( "Error creating temp file", e ); + + } finally { + if ( os != null ) { + IOUtils.closeQuietly( os ); + } + } + + // if tempFile is too large, delete it, add error to entity file metadata and abort + + Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity ); + + if ( tempFile.length() > maxSizeBytes ) { + LOG.debug("File too large. Temp file size (bytes) = {}, " + + "Max file size (bytes) = {} ", tempFile.length(), maxSizeBytes); + try { + EntityManager em = emf.getEntityManager( appId ); + fileMetadata.put( "error", "Asset size " + tempFile.length() + + " is larger than max size of " + maxSizeBytes ); + em.update( entity ); + tempFile.delete(); + + } catch ( Exception e ) { + LOG.error( "Error updating entity with error message", e); + } + return null; + } + + String uploadFileName = AssetUtils.buildAssetKey( appId, entity ); + String mimeType = AssetMimeHandler.get().getMimeType( entity, data ); + + try { // start the upload + + LOG.debug( "S3 upload thread started" ); + + BlobStore blobStore = getContext().getBlobStore(); + + BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName ) + .payload( tempFile ).calculateMD5().contentType( mimeType ); + + if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) { + bb.contentDisposition( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ).toString() ); + } + final Blob blob = bb.build(); + + String md5sum = Hex.encodeHexString( blob.getMetadata().getContentMetadata().getContentMD5() ); + fileMetadata.put( AssetUtils.CHECKSUM, md5sum ); + + LOG.debug( "S3 upload starting" ); + + String eTag = blobStore.putBlob( bucketName, blob ); + + LOG.debug( "S3 upload complete eTag=" + eTag); + + // update entity with information about uploaded asset + + EntityManager em = emf.getEntityManager( appId ); + fileMetadata.put( AssetUtils.E_TAG, eTag ); + fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() ); + fileMetadata.put( AssetUtils.CONTENT_LENGTH, written ); + em.update( entity ); + } + catch ( Exception e ) { + LOG.error( "error uploading", e ); + } + + if ( tempFile != null && tempFile.exists() ) { + tempFile.delete(); + } + + return null; + } + } }