This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch branch-feature-logsearch-ga in repository https://gitbox.apache.org/repos/asf/ambari.git
commit df10aa765d8a2b7cc433023200885aa79d3deb3d Author: Oliver Szabo <[email protected]> AuthorDate: Sun Sep 23 16:00:24 2018 +0200 Get rid of AWS sdk --- .../model/inputconfig/InputS3FileDescriptor.java | 3 + .../impl/InputS3FileDescriptorImpl.java | 18 ++++ .../ambari-logsearch-logfeeder/pom.xml | 11 +-- .../apache/ambari/logfeeder/input/InputS3File.java | 7 +- .../ambari/logfeeder/output/OutputS3File.java | 4 +- .../logfeeder/output/S3OutputConfiguration.java | 10 ++- .../apache/ambari/logfeeder/output/S3Uploader.java | 18 +--- .../org/apache/ambari/logfeeder/util/S3Util.java | 95 ++++++++++------------ .../model/common/LSServerInputS3File.java | 12 +++ 9 files changed, 99 insertions(+), 79 deletions(-) diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java index b075629..9886793 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java @@ -20,6 +20,9 @@ package org.apache.ambari.logsearch.config.api.model.inputconfig; public interface InputS3FileDescriptor extends InputFileBaseDescriptor { + + String getS3Endpoint(); + String getS3AccessKey(); String getS3SecretKey(); diff --git a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputS3FileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputS3FileDescriptorImpl.java index 527dae8..5d2c19c 100644 --- a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputS3FileDescriptorImpl.java +++ b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputS3FileDescriptorImpl.java @@ -49,6 +49,15 @@ public class InputS3FileDescriptorImpl extends InputFileBaseDescriptorImpl imple @SerializedName("s3_secret_key") private String s3SecretKey; + @ShipperConfigElementDescription( + path = "/input/[]/s3_endpoint", + type = "string", + description = "Endpoint URL for S3." + ) + @Expose + @SerializedName("s3_endpoint") + private String s3Endpoint; + @Override public String getS3AccessKey() { return s3AccessKey; @@ -66,4 +75,13 @@ public class InputS3FileDescriptorImpl extends InputFileBaseDescriptorImpl imple public void setS3SecretKey(String s3SecretKey) { this.s3SecretKey = s3SecretKey; } + + @Override + public String getS3Endpoint() { + return s3Endpoint; + } + + public void setS3Endpoint(String s3Endpoint) { + this.s3Endpoint = s3Endpoint; + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml index bdf3928..6dd3905 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml +++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml @@ -145,14 +145,9 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-core</artifactId> - <version>1.11.412</version> - </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - <version>1.11.412</version> + <groupId>io.minio</groupId> + <artifactId>minio</artifactId> + <version>5.0.1</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java index 274183c..c4d5fb9 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java @@ -18,6 +18,7 @@ */ package org.apache.ambari.logfeeder.input; +import org.apache.ambari.logfeeder.output.S3OutputConfiguration; import org.apache.ambari.logfeeder.util.S3Util; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor; import org.apache.commons.lang.ArrayUtils; @@ -82,7 +83,11 @@ public class InputS3File extends InputFile { public BufferedReader openLogFile(File logPathFile) throws Exception { String s3AccessKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3AccessKey(); String s3SecretKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3SecretKey(); - BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey); + String s3Endpoint = ((InputS3FileDescriptor)getInputDescriptor()).getS3Endpoint(); + if (s3Endpoint == null) { + s3Endpoint = S3OutputConfiguration.DEFAULT_S3_ENDPOINT; + } + BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3Endpoint, s3AccessKey, s3SecretKey); Object fileKey = getFileKey(logPathFile); setFileKey(fileKey); String base64FileKey = Base64.byteArrayToBase64(getFileKey().toString().getBytes()); diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java index fb13010..38a2937 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -130,8 +130,8 @@ public class OutputS3File extends OutputFile implements RolloverCondition, Rollo String s3ResolvedKey = new S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, s3OutputConfiguration.getCluster()); - S3Util.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), s3ResolvedKey, - s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey()); + S3Util.writeDataIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), s3ResolvedKey, + s3OutputConfiguration.getS3Endpoint(), s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey()); } private String getComponentConfigFileName(String componentName) { diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java index a2d7692..293f011 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java @@ -37,6 +37,8 @@ public class S3OutputConfiguration { public static final String S3_LOG_DIR_KEY = "s3_log_dir"; public static final String S3_ACCESS_KEY = "s3_access_key"; public static final String S3_SECRET_KEY = "s3_secret_key"; + public static final String S3_ENDPOINT = "s3_endpoint"; + public static final String DEFAULT_S3_ENDPOINT = "https://s3.amazonaws.com"; public static final String COMPRESSION_ALGO_KEY = "compression_algo"; public static final String ADDITIONAL_FIELDS_KEY = "add_fields"; public static final String CLUSTER_KEY = "cluster"; @@ -51,6 +53,10 @@ public class S3OutputConfiguration { return (String) configs.get(S3_BUCKET_NAME_KEY); } + public String getS3Endpoint() { + return (String) configs.getOrDefault(S3_ENDPOINT, DEFAULT_S3_ENDPOINT); + } + public String getS3Path() { return (String) configs.get(S3_LOG_DIR_KEY); } @@ -84,7 +90,7 @@ public class S3OutputConfiguration { Map<String, Object> configs = new HashMap<>(); String[] stringValuedKeysToCopy = new String[] { SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY, - S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY + S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY, S3_ENDPOINT }; for (String key : stringValuedKeysToCopy) { @@ -108,6 +114,8 @@ public class S3OutputConfiguration { configs.put(ADDITIONAL_FIELDS_KEY, configItem.getNVList(ADDITIONAL_FIELDS_KEY)); + configs.putIfAbsent(S3_ENDPOINT, DEFAULT_S3_ENDPOINT); + return new S3OutputConfiguration(configs); } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java index bbd0cef..9e0f8b8 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java @@ -18,8 +18,6 @@ package org.apache.ambari.logfeeder.output; -import com.amazonaws.services.s3.transfer.TransferManager; -import com.amazonaws.services.s3.transfer.Upload; import com.google.common.annotations.VisibleForTesting; import org.apache.ambari.logfeeder.common.LogFeederConstants; import org.apache.ambari.logfeeder.util.CompressionUtil; @@ -129,6 +127,7 @@ public class S3Uploader implements Runnable { String s3AccessKey = s3OutputConfiguration.getS3AccessKey(); String s3SecretKey = s3OutputConfiguration.getS3SecretKey(); String compressionAlgo = s3OutputConfiguration.getCompressionAlgo(); + String s3Endpoint = s3OutputConfiguration.getS3Endpoint(); String keySuffix = fileToUpload.getName() + "." + compressionAlgo; String s3Path = new S3LogPathResolver().getResolvedPath( @@ -138,7 +137,7 @@ public class S3Uploader implements Runnable { File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo); logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path); - uploadFileToS3(bucketName, s3Path, sourceFile, s3AccessKey, s3SecretKey); + S3Util.writeFileIntoS3File(sourceFile.getAbsolutePath(), bucketName, s3Path, s3Endpoint, s3AccessKey, s3SecretKey); // delete local compressed file sourceFile.delete(); @@ -152,19 +151,6 @@ public class S3Uploader implements Runnable { } @VisibleForTesting - protected void uploadFileToS3(String bucketName, String s3Key, File localFile, String accessKey, String secretKey) { - TransferManager transferManager = S3Util.getTransferManager(accessKey, secretKey); - try { - Upload upload = transferManager.upload(bucketName, s3Key, localFile); - upload.waitForUploadResult(); - } catch (InterruptedException e) { - logger.error("s3 uploading failed for file :" + localFile.getAbsolutePath(), e); - } finally { - S3Util.shutdownTransferManager(transferManager); - } - } - - @VisibleForTesting protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) { File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_" + new Date().getTime() + "." + compressionAlgo); diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java index aa3aa4c..632594b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java @@ -19,23 +19,18 @@ package org.apache.ambari.logfeeder.util; import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; +import java.io.ByteArrayInputStream; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.zip.GZIPInputStream; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import io.minio.MinioClient; +import io.minio.errors.InvalidEndpointException; +import io.minio.errors.InvalidPortException; import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.commons.io.IOUtils; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.transfer.TransferManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -49,19 +44,8 @@ public class S3Util { throw new UnsupportedOperationException(); } - public static AmazonS3 getS3Client(String accessKey, String secretKey) { - AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient(); - return AmazonS3ClientBuilder.defaultClient(); - } - - public static TransferManager getTransferManager(String accessKey, String secretKey) { - return new TransferManager(); - } - - public static void shutdownTransferManager(TransferManager transferManager) { - if (transferManager != null) { - transferManager.shutdownNow(); - } + public static MinioClient getS3Client(String endpoint, String accessKey, String secretKey) throws InvalidPortException, InvalidEndpointException { + return new MinioClient(endpoint, accessKey, secretKey); } public static String getBucketName(String s3Path) { @@ -93,49 +77,58 @@ public class S3Util { /** * Get the buffer reader to read s3 file as a stream */ - public static BufferedReader getReader(String s3Path, String accessKey, String secretKey) throws IOException { + public static BufferedReader getReader(String s3Path, String s3Endpoint, String accessKey, String secretKey) throws Exception { // TODO error handling // Compression support // read header and decide the compression(auto detection) // For now hard-code GZIP compression String s3Bucket = getBucketName(s3Path); String s3Key = getS3Key(s3Path); - S3Object fileObj = getS3Client(accessKey, secretKey).getObject(new GetObjectRequest(s3Bucket, s3Key)); + GZIPInputStream objectInputStream = null; + InputStreamReader inputStreamReader = null; + BufferedReader bufferedReader = null; try { - GZIPInputStream objectInputStream = new GZIPInputStream(fileObj.getObjectContent()); - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(objectInputStream)); + MinioClient s3Client = getS3Client(s3Endpoint, accessKey, secretKey); + s3Client.statObject(s3Bucket, s3Key); + objectInputStream = new GZIPInputStream(s3Client.getObject(s3Bucket, s3Key)); + inputStreamReader = new InputStreamReader(objectInputStream); + bufferedReader = new BufferedReader(inputStreamReader); return bufferedReader; - } catch (IOException e) { + } catch (Exception e) { logger.error("Error in creating stream reader for s3 file :" + s3Path, e.getCause()); throw e; + } finally { + try { + if (inputStreamReader != null) { + inputStreamReader.close(); + } + if (bufferedReader != null) { + bufferedReader.close(); + } + if (objectInputStream != null) { + objectInputStream.close(); + } + } catch (Exception e) { + // do nothing + } } } - public static void writeIntoS3File(String data, String bucketName, String s3Key, String accessKey, String secretKey) { - InputStream in = null; + public static void writeFileIntoS3File(String filename, String bucketName, String s3Key, String endpoint, String accessKey, String secretKey) { try { - in = IOUtils.toInputStream(data, "UTF-8"); - } catch (IOException e) { - logger.error(e); + MinioClient s3Client = getS3Client(endpoint, accessKey, secretKey); + s3Client.putObject(bucketName, s3Key, filename); + } catch (Exception e) { + logger.error("Could not write file to s3", e); } - - if (in != null) { - TransferManager transferManager = getTransferManager(accessKey, secretKey); - try { - if (transferManager != null) { - transferManager.upload(new PutObjectRequest(bucketName, s3Key, in, new ObjectMetadata())).waitForUploadResult(); - logger.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :" + bucketName); - } - } catch (Exception e) { - logger.error(e); - } finally { - try { - shutdownTransferManager(transferManager); - in.close(); - } catch (IOException e) { - // ignore - } - } + } + + public static void writeDataIntoS3File(String data, String bucketName, String s3Key, String endpoint, String accessKey, String secretKey) { + try (ByteArrayInputStream bai = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) { + MinioClient s3Client = getS3Client(endpoint, accessKey, secretKey); + s3Client.putObject(bucketName, s3Key, bai, bai.available(), "application/octet-stream"); + } catch (Exception e) { + logger.error("Could not write data to s3", e); } } } diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java index 24d25c4..628a940 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java @@ -37,6 +37,9 @@ public class LSServerInputS3File extends LSServerInputFileBase { @NotNull @JsonProperty("s3_secret_key") private String s3SecretKey; + + @JsonProperty("s3_endpoint") + private String s3Endpoint; public LSServerInputS3File() {} @@ -45,6 +48,7 @@ public class LSServerInputS3File extends LSServerInputFileBase { InputS3FileDescriptor inputS3FileDescriptor = (InputS3FileDescriptor)inputDescriptor; this.s3AccessKey = inputS3FileDescriptor.getS3AccessKey(); this.s3SecretKey = inputS3FileDescriptor.getS3SecretKey(); + this.s3Endpoint = inputS3FileDescriptor.getS3Endpoint(); } public String getS3AccessKey() { @@ -62,4 +66,12 @@ public class LSServerInputS3File extends LSServerInputFileBase { public void setS3SecretKey(String s3SecretKey) { this.s3SecretKey = s3SecretKey; } + + public String getS3Endpoint() { + return s3Endpoint; + } + + public void setS3Endpoint(String s3Endpoint) { + this.s3Endpoint = s3Endpoint; + } }
