This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch branch-feature-logsearch-ga
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit df10aa765d8a2b7cc433023200885aa79d3deb3d
Author: Oliver Szabo <[email protected]>
AuthorDate: Sun Sep 23 16:00:24 2018 +0200

    Get rid of AWS sdk
---
 .../model/inputconfig/InputS3FileDescriptor.java   |  3 +
 .../impl/InputS3FileDescriptorImpl.java            | 18 ++++
 .../ambari-logsearch-logfeeder/pom.xml             | 11 +--
 .../apache/ambari/logfeeder/input/InputS3File.java |  7 +-
 .../ambari/logfeeder/output/OutputS3File.java      |  4 +-
 .../logfeeder/output/S3OutputConfiguration.java    | 10 ++-
 .../apache/ambari/logfeeder/output/S3Uploader.java | 18 +---
 .../org/apache/ambari/logfeeder/util/S3Util.java   | 95 ++++++++++------------
 .../model/common/LSServerInputS3File.java          | 12 +++
 9 files changed, 99 insertions(+), 79 deletions(-)

diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java
index b075629..9886793 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputS3FileDescriptor.java
@@ -20,6 +20,9 @@
 package org.apache.ambari.logsearch.config.api.model.inputconfig;
 
 public interface InputS3FileDescriptor extends InputFileBaseDescriptor {
+
+  String getS3Endpoint();
+
   String getS3AccessKey();
 
   String getS3SecretKey();
diff --git 
a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputS3FileDescriptorImpl.java
 
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputS3FileDescriptorImpl.java
index 527dae8..5d2c19c 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputS3FileDescriptorImpl.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputS3FileDescriptorImpl.java
@@ -49,6 +49,15 @@ public class InputS3FileDescriptorImpl extends 
InputFileBaseDescriptorImpl imple
   @SerializedName("s3_secret_key")
   private String s3SecretKey;
 
+  @ShipperConfigElementDescription(
+    path = "/input/[]/s3_endpoint",
+    type = "string",
+    description = "Endpoint URL for S3."
+  )
+  @Expose
+  @SerializedName("s3_endpoint")
+  private String s3Endpoint;
+
   @Override
   public String getS3AccessKey() {
     return s3AccessKey;
@@ -66,4 +75,13 @@ public class InputS3FileDescriptorImpl extends 
InputFileBaseDescriptorImpl imple
   public void setS3SecretKey(String s3SecretKey) {
     this.s3SecretKey = s3SecretKey;
   }
+
+  @Override
+  public String getS3Endpoint() {
+    return s3Endpoint;
+  }
+
+  public void setS3Endpoint(String s3Endpoint) {
+    this.s3Endpoint = s3Endpoint;
+  }
 }
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml 
b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index bdf3928..6dd3905 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -145,14 +145,9 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.amazonaws</groupId>
-      <artifactId>aws-java-sdk-core</artifactId>
-      <version>1.11.412</version>
-    </dependency>
-    <dependency>
-      <groupId>com.amazonaws</groupId>
-      <artifactId>aws-java-sdk-s3</artifactId>
-      <version>1.11.412</version>
+      <groupId>io.minio</groupId>
+      <artifactId>minio</artifactId>
+      <version>5.0.1</version>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index 274183c..c4d5fb9 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.logfeeder.input;
 
+import org.apache.ambari.logfeeder.output.S3OutputConfiguration;
 import org.apache.ambari.logfeeder.util.S3Util;
 import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
 import org.apache.commons.lang.ArrayUtils;
@@ -82,7 +83,11 @@ public class InputS3File extends InputFile {
   public BufferedReader openLogFile(File logPathFile) throws Exception {
     String s3AccessKey = 
((InputS3FileDescriptor)getInputDescriptor()).getS3AccessKey();
     String s3SecretKey = 
((InputS3FileDescriptor)getInputDescriptor()).getS3SecretKey();
-    BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, 
s3SecretKey);
+    String s3Endpoint = 
((InputS3FileDescriptor)getInputDescriptor()).getS3Endpoint();
+    if (s3Endpoint == null) {
+      s3Endpoint = S3OutputConfiguration.DEFAULT_S3_ENDPOINT;
+    }
+    BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3Endpoint, 
s3AccessKey, s3SecretKey);
     Object fileKey = getFileKey(logPathFile);
     setFileKey(fileKey);
     String base64FileKey = 
Base64.byteArrayToBase64(getFileKey().toString().getBytes());
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index fb13010..38a2937 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -130,8 +130,8 @@ public class OutputS3File extends OutputFile implements 
RolloverCondition, Rollo
     String s3ResolvedKey = new 
S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), 
s3KeySuffix,
         s3OutputConfiguration.getCluster());
 
-    S3Util.writeIntoS3File(configJson, 
s3OutputConfiguration.getS3BucketName(), s3ResolvedKey,
-        s3OutputConfiguration.getS3AccessKey(), 
s3OutputConfiguration.getS3SecretKey());
+    S3Util.writeDataIntoS3File(configJson, 
s3OutputConfiguration.getS3BucketName(), s3ResolvedKey,
+      s3OutputConfiguration.getS3Endpoint(), 
s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey());
   }
 
   private String getComponentConfigFileName(String componentName) {
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
index a2d7692..293f011 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -37,6 +37,8 @@ public class S3OutputConfiguration {
   public static final String S3_LOG_DIR_KEY = "s3_log_dir";
   public static final String S3_ACCESS_KEY = "s3_access_key";
   public static final String S3_SECRET_KEY = "s3_secret_key";
+  public static final String S3_ENDPOINT = "s3_endpoint";
+  public static final String DEFAULT_S3_ENDPOINT = "https://s3.amazonaws.com";;
   public static final String COMPRESSION_ALGO_KEY = "compression_algo";
   public static final String ADDITIONAL_FIELDS_KEY = "add_fields";
   public static final String CLUSTER_KEY = "cluster";
@@ -51,6 +53,10 @@ public class S3OutputConfiguration {
     return (String) configs.get(S3_BUCKET_NAME_KEY);
   }
 
+  public String getS3Endpoint() {
+    return (String) configs.getOrDefault(S3_ENDPOINT, DEFAULT_S3_ENDPOINT);
+  }
+
   public String getS3Path() {
     return (String) configs.get(S3_LOG_DIR_KEY);
   }
@@ -84,7 +90,7 @@ public class S3OutputConfiguration {
     Map<String, Object> configs = new HashMap<>();
     String[] stringValuedKeysToCopy = new String[] {
         SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY,
-        S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY
+        S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY, S3_ENDPOINT
     };
 
     for (String key : stringValuedKeysToCopy) {
@@ -108,6 +114,8 @@ public class S3OutputConfiguration {
 
     configs.put(ADDITIONAL_FIELDS_KEY, 
configItem.getNVList(ADDITIONAL_FIELDS_KEY));
 
+    configs.putIfAbsent(S3_ENDPOINT, DEFAULT_S3_ENDPOINT);
+
     return new S3OutputConfiguration(configs);
   }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
index bbd0cef..9e0f8b8 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -18,8 +18,6 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.Upload;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.util.CompressionUtil;
@@ -129,6 +127,7 @@ public class S3Uploader implements Runnable {
     String s3AccessKey = s3OutputConfiguration.getS3AccessKey();
     String s3SecretKey = s3OutputConfiguration.getS3SecretKey();
     String compressionAlgo = s3OutputConfiguration.getCompressionAlgo();
+    String s3Endpoint = s3OutputConfiguration.getS3Endpoint();
 
     String keySuffix = fileToUpload.getName() + "." + compressionAlgo;
     String s3Path = new S3LogPathResolver().getResolvedPath(
@@ -138,7 +137,7 @@ public class S3Uploader implements Runnable {
     File sourceFile = createCompressedFileForUpload(fileToUpload, 
compressionAlgo);
 
     logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", 
" + s3Path);
-    uploadFileToS3(bucketName, s3Path, sourceFile, s3AccessKey, s3SecretKey);
+    S3Util.writeFileIntoS3File(sourceFile.getAbsolutePath(), bucketName, 
s3Path, s3Endpoint, s3AccessKey, s3SecretKey);
 
     // delete local compressed file
     sourceFile.delete();
@@ -152,19 +151,6 @@ public class S3Uploader implements Runnable {
   }
 
   @VisibleForTesting
-  protected void uploadFileToS3(String bucketName, String s3Key, File 
localFile, String accessKey, String secretKey) {
-    TransferManager transferManager = S3Util.getTransferManager(accessKey, 
secretKey);
-    try {
-      Upload upload = transferManager.upload(bucketName, s3Key, localFile);
-      upload.waitForUploadResult();
-    } catch (InterruptedException e) {
-      logger.error("s3 uploading failed for file :" + 
localFile.getAbsolutePath(), e);
-    } finally {
-      S3Util.shutdownTransferManager(transferManager);
-    }
-  }
-
-  @VisibleForTesting
   protected File createCompressedFileForUpload(File fileToUpload, String 
compressionAlgo) {
     File outputFile = new File(fileToUpload.getParent(), 
fileToUpload.getName() + "_" + new Date().getTime() +
         "." + compressionAlgo);
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
index aa3aa4c..632594b 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
@@ -19,23 +19,18 @@
 package org.apache.ambari.logfeeder.util;
 
 import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.ByteArrayInputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.zip.GZIPInputStream;
 
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import io.minio.MinioClient;
+import io.minio.errors.InvalidEndpointException;
+import io.minio.errors.InvalidPortException;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.commons.io.IOUtils;
 
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.transfer.TransferManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -49,19 +44,8 @@ public class S3Util {
     throw new UnsupportedOperationException();
   }
   
-  public static AmazonS3 getS3Client(String accessKey, String secretKey) {
-    AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient();
-    return AmazonS3ClientBuilder.defaultClient();
-  }
-
-  public static TransferManager getTransferManager(String accessKey, String 
secretKey) {
-    return new TransferManager();
-  }
-
-  public static void shutdownTransferManager(TransferManager transferManager) {
-    if (transferManager != null) {
-      transferManager.shutdownNow();
-    }
+  public static MinioClient getS3Client(String endpoint, String accessKey, 
String secretKey) throws InvalidPortException, InvalidEndpointException {
+    return new MinioClient(endpoint, accessKey, secretKey);
   }
 
   public static String getBucketName(String s3Path) {
@@ -93,49 +77,58 @@ public class S3Util {
   /**
    * Get the buffer reader to read s3 file as a stream
    */
-  public static BufferedReader getReader(String s3Path, String accessKey, 
String secretKey) throws IOException {
+  public static BufferedReader getReader(String s3Path, String s3Endpoint, 
String accessKey, String secretKey) throws Exception {
     // TODO error handling
     // Compression support
     // read header and decide the compression(auto detection)
     // For now hard-code GZIP compression
     String s3Bucket = getBucketName(s3Path);
     String s3Key = getS3Key(s3Path);
-    S3Object fileObj = getS3Client(accessKey, secretKey).getObject(new 
GetObjectRequest(s3Bucket, s3Key));
+    GZIPInputStream objectInputStream = null;
+    InputStreamReader inputStreamReader = null;
+    BufferedReader bufferedReader = null;
     try {
-      GZIPInputStream objectInputStream = new 
GZIPInputStream(fileObj.getObjectContent());
-      BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(objectInputStream));
+      MinioClient s3Client = getS3Client(s3Endpoint, accessKey, secretKey);
+      s3Client.statObject(s3Bucket, s3Key);
+      objectInputStream = new GZIPInputStream(s3Client.getObject(s3Bucket, 
s3Key));
+      inputStreamReader = new InputStreamReader(objectInputStream);
+      bufferedReader = new BufferedReader(inputStreamReader);
       return bufferedReader;
-    } catch (IOException e) {
+    } catch (Exception e) {
       logger.error("Error in creating stream reader for s3 file :" + s3Path, 
e.getCause());
       throw e;
+    } finally {
+      try {
+        if (inputStreamReader != null) {
+          inputStreamReader.close();
+        }
+        if (bufferedReader != null) {
+          bufferedReader.close();
+        }
+        if (objectInputStream != null) {
+          objectInputStream.close();
+        }
+      } catch (Exception e) {
+        // do nothing
+      }
     }
   }
 
-  public static void writeIntoS3File(String data, String bucketName, String 
s3Key, String accessKey, String secretKey) {
-    InputStream in = null;
+  public static void writeFileIntoS3File(String filename, String bucketName, 
String s3Key, String endpoint, String accessKey, String secretKey) {
     try {
-      in = IOUtils.toInputStream(data, "UTF-8");
-    } catch (IOException e) {
-      logger.error(e);
+      MinioClient s3Client = getS3Client(endpoint, accessKey, secretKey);
+      s3Client.putObject(bucketName, s3Key, filename);
+    } catch (Exception e) {
+      logger.error("Could not write file to s3", e);
     }
-    
-    if (in != null) {
-      TransferManager transferManager = getTransferManager(accessKey, 
secretKey);
-      try {
-        if (transferManager != null) {
-          transferManager.upload(new PutObjectRequest(bucketName, s3Key, in, 
new ObjectMetadata())).waitForUploadResult();
-          logger.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :" + 
bucketName);
-        }
-      } catch (Exception e) {
-        logger.error(e);
-      } finally {
-        try {
-          shutdownTransferManager(transferManager);
-          in.close();
-        } catch (IOException e) {
-          // ignore
-        }
-      }
+  }
+
+  public static void writeDataIntoS3File(String data, String bucketName, 
String s3Key, String endpoint, String accessKey, String secretKey) {
+    try (ByteArrayInputStream bai = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) {
+      MinioClient s3Client = getS3Client(endpoint, accessKey, secretKey);
+      s3Client.putObject(bucketName, s3Key, bai, bai.available(), 
"application/octet-stream");
+    } catch (Exception e) {
+      logger.error("Could not write data to s3", e);
     }
   }
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java
 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java
index 24d25c4..628a940 100644
--- 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java
+++ 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputS3File.java
@@ -37,6 +37,9 @@ public class LSServerInputS3File extends 
LSServerInputFileBase {
   @NotNull
   @JsonProperty("s3_secret_key")
   private String s3SecretKey;
+
+  @JsonProperty("s3_endpoint")
+  private String s3Endpoint;
   
   public LSServerInputS3File() {}
   
@@ -45,6 +48,7 @@ public class LSServerInputS3File extends 
LSServerInputFileBase {
     InputS3FileDescriptor inputS3FileDescriptor = 
(InputS3FileDescriptor)inputDescriptor;
     this.s3AccessKey = inputS3FileDescriptor.getS3AccessKey();
     this.s3SecretKey = inputS3FileDescriptor.getS3SecretKey();
+    this.s3Endpoint = inputS3FileDescriptor.getS3Endpoint();
   }
 
   public String getS3AccessKey() {
@@ -62,4 +66,12 @@ public class LSServerInputS3File extends 
LSServerInputFileBase {
   public void setS3SecretKey(String s3SecretKey) {
     this.s3SecretKey = s3SecretKey;
   }
+
+  public String getS3Endpoint() {
+    return s3Endpoint;
+  }
+
+  public void setS3Endpoint(String s3Endpoint) {
+    this.s3Endpoint = s3Endpoint;
+  }
 }

Reply via email to