kasakrisz commented on a change in pull request #19: AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs URL: https://github.com/apache/ambari-logsearch/pull/19#discussion_r232543287
########## File path: ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java ########## @@ -19,12 +19,139 @@ package org.apache.ambari.logfeeder.output.cloud; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.conf.output.RolloverConfig; +import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient; +import org.apache.ambari.logfeeder.output.cloud.upload.UploadClientFactory; +import org.apache.ambari.logfeeder.plugin.input.Input; import org.apache.ambari.logfeeder.plugin.input.InputMarker; import org.apache.ambari.logfeeder.plugin.output.Output; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.RollingFileAppender; +import org.apache.logging.log4j.core.config.Configuration; + +import java.io.File; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** - * Class to handle common operations for cloud storage outputs - * TODO !!! + * Output class for cloud outputs. + * Holds loggers - those will ship logs into specific folders, those files can be rolled out to an archive folder, + * from there an upload client will be able to ship the log archives to a cloud storage */ -public abstract class CloudStorageOutput extends Output<LogFeederProps, InputMarker> { +public class CloudStorageOutput extends Output<LogFeederProps, InputMarker> { + + private static final Logger logger = LogManager.getLogger(CloudStorageOutput.class); + + private final Map<String, Logger> cloudOutputLoggers = new ConcurrentHashMap<>(); + private final UploadClient uploadClient; + private final LogFeederProps logFeederProps; + private final LoggerContext loggerContext; + private final CloudStorageUploader uploader; + private final RolloverConfig rolloverConfig; + + public CloudStorageOutput(LogFeederProps logFeederProps) { + this.uploadClient = UploadClientFactory.createUploadClient(logFeederProps); + this.logFeederProps = logFeederProps; + this.rolloverConfig = logFeederProps.getRolloverConfig(); + loggerContext = (LoggerContext) LogManager.getContext(false); + uploader = new CloudStorageUploader(String.format("%s-uploader", logFeederProps.getCloudStorageDestination().getText()), uploadClient, logFeederProps); + uploader.setDaemon(true); + } + + @Override + public void init(LogFeederProps logFeederProperties) throws Exception { + logger.info("Initialize cloud output."); + uploadClient.init(logFeederProperties); + uploader.start(); + } + + @Override + public String getOutputType() { + return "cloud"; + } + + @Override + public void copyFile(File inputFile, InputMarker inputMarker) throws Exception { + throw new UnsupportedOperationException("Copy file is not supported yet"); + } + + @Override + public void write(String jsonStr, InputMarker inputMarker) throws Exception { + String uniqueThreadName = inputMarker.getInput().getThread().getName(); + Logger cloudLogger = null; + if (cloudOutputLoggers.containsKey(uniqueThreadName)) { + cloudLogger = cloudOutputLoggers.get(uniqueThreadName); + } else { + logger.info("New cloud input source found. Register: {}", uniqueThreadName); + cloudLogger = CloudStorageLoggerFactory.createLogger(inputMarker.getInput(), loggerContext, logFeederProps); + cloudOutputLoggers.put(uniqueThreadName, cloudLogger); + } + cloudLogger.info(jsonStr); + inputMarker.getInput().checkIn(inputMarker); + } + + @Override + public Long getPendingCount() { + return 0L; + } + + @Override + public String getWriteBytesMetricName() { + return "write:cloud"; + } + + @Override + public String getShortDescription() { + return "cloud"; + } + + @Override + public String getStatMetricName() { + return null; Review comment: Should an `UnsupportedOperationException` be thrown? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services