[
https://issues.apache.org/jira/browse/AMBARI-24833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683383#comment-16683383
]
ASF GitHub Bot commented on AMBARI-24833:
-----------------------------------------
kasakrisz commented on a change in pull request #19: AMBARI-24833. Re-implement
S3/HDFS outputs as global cloud outputs
URL: https://github.com/apache/ambari-logsearch/pull/19#discussion_r232543287
##########
File path:
ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
##########
@@ -19,12 +19,139 @@
package org.apache.ambari.logfeeder.output.cloud;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClientFactory;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
- * Class to handle common operations for cloud storage outputs
- * TODO !!!
+ * Output class for cloud outputs.
+ * Holds loggers - those will ship logs into specific folders, those files can
be rolled out to an archive folder,
+ * from there an upload client will be able to ship the log archives to a
cloud storage
*/
-public abstract class CloudStorageOutput extends Output<LogFeederProps,
InputMarker> {
+public class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
+
+ private static final Logger logger =
LogManager.getLogger(CloudStorageOutput.class);
+
+ private final Map<String, Logger> cloudOutputLoggers = new
ConcurrentHashMap<>();
+ private final UploadClient uploadClient;
+ private final LogFeederProps logFeederProps;
+ private final LoggerContext loggerContext;
+ private final CloudStorageUploader uploader;
+ private final RolloverConfig rolloverConfig;
+
+ public CloudStorageOutput(LogFeederProps logFeederProps) {
+ this.uploadClient = UploadClientFactory.createUploadClient(logFeederProps);
+ this.logFeederProps = logFeederProps;
+ this.rolloverConfig = logFeederProps.getRolloverConfig();
+ loggerContext = (LoggerContext) LogManager.getContext(false);
+ uploader = new CloudStorageUploader(String.format("%s-uploader",
logFeederProps.getCloudStorageDestination().getText()), uploadClient,
logFeederProps);
+ uploader.setDaemon(true);
+ }
+
+ @Override
+ public void init(LogFeederProps logFeederProperties) throws Exception {
+ logger.info("Initialize cloud output.");
+ uploadClient.init(logFeederProperties);
+ uploader.start();
+ }
+
+ @Override
+ public String getOutputType() {
+ return "cloud";
+ }
+
+ @Override
+ public void copyFile(File inputFile, InputMarker inputMarker) throws
Exception {
+ throw new UnsupportedOperationException("Copy file is not supported yet");
+ }
+
+ @Override
+ public void write(String jsonStr, InputMarker inputMarker) throws Exception {
+ String uniqueThreadName = inputMarker.getInput().getThread().getName();
+ Logger cloudLogger = null;
+ if (cloudOutputLoggers.containsKey(uniqueThreadName)) {
+ cloudLogger = cloudOutputLoggers.get(uniqueThreadName);
+ } else {
+ logger.info("New cloud input source found. Register: {}",
uniqueThreadName);
+ cloudLogger =
CloudStorageLoggerFactory.createLogger(inputMarker.getInput(), loggerContext,
logFeederProps);
+ cloudOutputLoggers.put(uniqueThreadName, cloudLogger);
+ }
+ cloudLogger.info(jsonStr);
+ inputMarker.getInput().checkIn(inputMarker);
+ }
+
+ @Override
+ public Long getPendingCount() {
+ return 0L;
+ }
+
+ @Override
+ public String getWriteBytesMetricName() {
+ return "write:cloud";
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "cloud";
+ }
+
+ @Override
+ public String getStatMetricName() {
+ return null;
Review comment:
Should an `UnsupportedOperationException` be thrown?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Log Feeder: send logs to cloud storage (gcs/s3 etc.)
> ----------------------------------------------------
>
> Key: AMBARI-24833
> URL: https://issues.apache.org/jira/browse/AMBARI-24833
> Project: Ambari
> Issue Type: Bug
> Components: ambari-logsearch
> Affects Versions: 2.7.0
> Reporter: Olivér Szabó
> Assignee: Olivér Szabó
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.8.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)