kasakrisz commented on a change in pull request #19: AMBARI-24833. Re-implement 
S3/HDFS outputs as global cloud outputs
URL: https://github.com/apache/ambari-logsearch/pull/19#discussion_r232543287
 
 

 ##########
 File path: 
ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
 ##########
 @@ -19,12 +19,139 @@
 package org.apache.ambari.logfeeder.output.cloud;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClientFactory;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Class to handle common operations for cloud storage outputs
- * TODO !!!
+ * Output class for cloud outputs.
+ * Holds loggers - those will ship logs into specific folders, those files can 
be rolled out to an archive folder,
+ * from there an upload client will be able to ship the log archives to a 
cloud storage
  */
-public abstract class CloudStorageOutput extends Output<LogFeederProps, 
InputMarker> {
+public class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
+
+  private static final Logger logger = 
LogManager.getLogger(CloudStorageOutput.class);
+
+  private final Map<String, Logger> cloudOutputLoggers = new 
ConcurrentHashMap<>();
+  private final UploadClient uploadClient;
+  private final LogFeederProps logFeederProps;
+  private final LoggerContext loggerContext;
+  private final CloudStorageUploader uploader;
+  private final RolloverConfig rolloverConfig;
+
+  public CloudStorageOutput(LogFeederProps logFeederProps) {
+    this.uploadClient = UploadClientFactory.createUploadClient(logFeederProps);
+    this.logFeederProps = logFeederProps;
+    this.rolloverConfig = logFeederProps.getRolloverConfig();
+    loggerContext = (LoggerContext) LogManager.getContext(false);
+    uploader = new CloudStorageUploader(String.format("%s-uploader", 
logFeederProps.getCloudStorageDestination().getText()), uploadClient, 
logFeederProps);
+    uploader.setDaemon(true);
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProperties) throws Exception {
+    logger.info("Initialize cloud output.");
+    uploadClient.init(logFeederProperties);
+    uploader.start();
+  }
+
+  @Override
+  public String getOutputType() {
+    return "cloud";
+  }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker) throws 
Exception {
+    throw new UnsupportedOperationException("Copy file is not supported yet");
+  }
+
+  @Override
+  public void write(String jsonStr, InputMarker inputMarker) throws Exception {
+    String uniqueThreadName = inputMarker.getInput().getThread().getName();
+    Logger cloudLogger = null;
+    if (cloudOutputLoggers.containsKey(uniqueThreadName)) {
+      cloudLogger = cloudOutputLoggers.get(uniqueThreadName);
+    } else {
+      logger.info("New cloud input source found. Register: {}", 
uniqueThreadName);
+      cloudLogger = 
CloudStorageLoggerFactory.createLogger(inputMarker.getInput(), loggerContext, 
logFeederProps);
+      cloudOutputLoggers.put(uniqueThreadName, cloudLogger);
+    }
+    cloudLogger.info(jsonStr);
+    inputMarker.getInput().checkIn(inputMarker);
+  }
+
+  @Override
+  public Long getPendingCount() {
+    return 0L;
+  }
+
+  @Override
+  public String getWriteBytesMetricName() {
+    return "write:cloud";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "cloud";
+  }
+
+  @Override
+  public String getStatMetricName() {
+    return null;
 
 Review comment:
   Should an `UnsupportedOperationException` be thrown?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to