eribeiro commented on a change in pull request #864: SOLR-13101 : Shared storage support in SolrCloud URL: https://github.com/apache/lucene-solr/pull/864#discussion_r323972376
########## File path: solr/core/src/java/org/apache/solr/store/blob/client/S3StorageClient.java ########## @@ -0,0 +1,385 @@ +package org.apache.solr.store.blob.client; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.solr.common.StringUtils; +import org.apache.solr.util.FileUtils; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.*; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.google.common.collect.Iterables; + +import org.apache.solr.store.blob.client.BlobCoreMetadata; +import org.apache.solr.store.blob.client.BlobClientUtils; +import org.apache.solr.store.blob.client.ToFromJson; + +/** + * This class implements an AmazonS3 client for reading and writing search index + * data to AWS S3. + */ +public class S3StorageClient implements CoreStorageClient { + + private final AmazonS3 s3Client; + + /** The S3 bucket where we write all of our blobs to */ + private final String blobBucketName; + + // S3 has a hard limit of 1000 keys per batch delete request + private static final int MAX_KEYS_PER_BATCH_DELETE = 1000; + + /** + * Construct a new S3StorageClient that is an implementation of the + * CoreStorageClient using AWS S3 as the underlying blob store service provider. + */ + public S3StorageClient() throws IOException { + String credentialsFilePath = AmazonS3Configs.CREDENTIALS_FILE_PATH.getValue(); + + // requires credentials file on disk to authenticate with S3 + if (!FileUtils.fileExists(credentialsFilePath)) { + throw new IOException("Credentials file does not exist in " + credentialsFilePath); + } + + /* + * default s3 client builder loads credentials from disk and handles token refreshes + */ + AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); + s3Client = builder + .withPathStyleAccessEnabled(true) + .withRegion(Regions.fromName(AmazonS3Configs.REGION.getValue())) + .build(); + + blobBucketName = AmazonS3Configs.BUCKET_NAME.getValue(); + } + + @Override + public void pushCoreMetadata(String sharedStoreName, String blobCoreMetadataName, BlobCoreMetadata bcm) + throws BlobException { + try { + ToFromJson<BlobCoreMetadata> converter = new ToFromJson<>(); + String json = converter.toJson(bcm); + + String blobCoreMetadataPath = getBlobMetadataPath(sharedStoreName, blobCoreMetadataName); + /* + * Encodes contents of the string into an S3 object. If no exception is thrown + * then the object is guaranteed to have been stored + */ + s3Client.putObject(blobBucketName, blobCoreMetadataPath, json); + } catch (AmazonServiceException ase) { + throw handleAmazonServiceException(ase); + } catch (AmazonClientException ace) { + throw new BlobClientException(ace); + } catch (Exception ex) { + throw new BlobException(ex); + } + } + + @Override + public BlobCoreMetadata pullCoreMetadata(String sharedStoreName, String blobCoreMetadataName) throws BlobException { + try { + String blobCoreMetadataPath = getBlobMetadataPath(sharedStoreName, blobCoreMetadataName); + + if (!coreMetadataExists(sharedStoreName, blobCoreMetadataName)) { + return null; + } + + String decodedJson = s3Client.getObjectAsString(blobBucketName, blobCoreMetadataPath); + ToFromJson<BlobCoreMetadata> converter = new ToFromJson<>(); + return converter.fromJson(decodedJson, BlobCoreMetadata.class); + } catch (AmazonServiceException ase) { + throw handleAmazonServiceException(ase); + } catch (AmazonClientException ace) { + throw new BlobClientException(ace); + } catch (Exception ex) { + throw new BlobException(ex); + } + } + + @Override + public InputStream pullStream(String path) throws BlobException { + try { + S3Object requestedObject = s3Client.getObject(blobBucketName, path); + // This InputStream instance needs to be closed by the caller + return requestedObject.getObjectContent(); + } catch (AmazonServiceException ase) { + throw handleAmazonServiceException(ase); + } catch (AmazonClientException ace) { + throw new BlobClientException(ace); + } catch (Exception ex) { + throw new BlobException(ex); + } + } + + @Override + public String pushStream(String blobName, InputStream is, long contentLength, String fileNamePrefix) + throws BlobException { + try { + /* + * This object metadata is associated per blob. This is different than the Solr + * Core metadata {@link BlobCoreMetadata} which sits as a separate blob object + * in the store. At minimum, ObjectMetadata requires the content length of the + * object to be set in the request header. + */ + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentLength(contentLength); + + String blobPath = BlobClientUtils.generateNewBlobCorePath(blobName, fileNamePrefix); + PutObjectRequest putRequest = new PutObjectRequest(blobBucketName, blobPath, is, objectMetadata); + + s3Client.putObject(putRequest); + is.close(); Review comment: Suggestion: put this in a `finally` block. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org For additional commands, e-mail: dev-h...@lucene.apache.org