This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
The following commit(s) were added to refs/heads/master by this push:
new 4e3940a AMBARI-24833. Do not open FS on main thread. (#33)
4e3940a is described below
commit 4e3940a647f170dd6fd9f9c3027ce301dde5dee4
Author: Olivér Szabó <[email protected]>
AuthorDate: Wed Nov 21 12:13:38 2018 +0100
AMBARI-24833. Do not open FS on main thread. (#33)
* AMBARI-24833. Do not open FS on main thread.
* AMBARI-24833. Use atomic reference for Configuration object
* AMBARI-24833. Use timeout for creating FS & upload files.
* AMBARI-24833. Fix typo
---
.../config/local/LogSearchConfigLogFeederLocal.java | 2 +-
.../ambari/logfeeder/common/LogFeederConstants.java | 1 +
.../ambari/logfeeder/conf/LogFeederProps.java | 18 ++++++++++++++++++
.../logfeeder/output/cloud/CloudStorageOutput.java | 2 +-
.../output/cloud/CloudStorageUploader.java | 21 ++++++++++++++++++---
.../output/cloud/upload/HDFSUploadClient.java | 10 ++++++----
6 files changed, 45 insertions(+), 9 deletions(-)
diff --git
a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
index 12af637..3f40e88 100644
---
a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
+++
b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
@@ -111,7 +111,7 @@ public class LogSearchConfigLogFeederLocal extends
LogSearchConfigLocal implemen
} catch (Exception e) {
final String errorMessage;
if (tries < 3) {
- errorMessage = String.format("Cannot parse input config: %s, will
retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
+ errorMessage = String.format("Cannot parse input config: '%s', will
retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
logger.error(errorMessage, e);
try {
Thread.sleep(2000);
diff --git
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index a15ac74..e5a6e38 100644
---
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -108,6 +108,7 @@ public class LogFeederConstants {
public static final String CLOUD_STORAGE_MODE =
"logfeeder.cloud.storage.mode";
public static final String CLOUD_STORAGE_DESTINATION =
"logfeeder.cloud.storage.destination";
public static final String CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN =
"logfeeder.cloud.storage.upload.on.shutdown";
+ public static final String CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES =
"logfeeder.cloud.storage.uploader.timeout.minutes";
public static final String CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS =
"logfeeder.cloud.storage.uploader.interval.seconds";
public static final String CLOUD_STORAGE_BUCKET =
"logfeeder.cloud.storage.bucket";
public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP =
"logfeeder.cloud.storage.bucket.bootstrap";
diff --git
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index b6ab4c7..9ed4c9b 100644
---
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -252,6 +252,16 @@ public class LogFeederProps implements LogFeederProperties
{
private Integer cloudStorageUploaderIntervalSeconds;
@LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES,
+ description = "Timeout value for uploading task to cloud storage in
minutes.",
+ examples = {"10"},
+ defaultValue = "60",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES +
":60}")
+ private Integer cloudStorageUploaderTimeoutMinutes;
+
+ @LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT,
description = "Use hdfs client with cloud connectors instead of the core
clients for shipping data to cloud storage",
examples = {"true"},
@@ -499,6 +509,14 @@ public class LogFeederProps implements LogFeederProperties
{
this.cloudStorageUploaderIntervalSeconds =
cloudStorageUploaderIntervalSeconds;
}
+ public Integer getCloudStorageUploaderTimeoutMinutes() {
+ return cloudStorageUploaderTimeoutMinutes;
+ }
+
+ public void setCloudStorageUploaderTimeoutMinutes(Integer
cloudStorageUploaderTimeoutMinutes) {
+ this.cloudStorageUploaderTimeoutMinutes =
cloudStorageUploaderTimeoutMinutes;
+ }
+
public boolean isUseCloudHdfsClient() {
return useCloudHdfsClient;
}
diff --git
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
index fbbffe6..4240fb1 100644
---
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
+++
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
@@ -129,7 +129,7 @@ public class CloudStorageOutput extends
Output<LogFeederProps, InputMarker> {
uploader.interrupt();
if (logFeederProps.isCloudStorageUploadOnShutdown()) {
logger.info("Do last upload before shutdown.");
- uploader.doUpload();
+ uploader.doUpload(2); // hard-coded 2 minutes timeout on shutdown
}
}
diff --git
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index 22c7fc1..cb4cac3 100644
---
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -28,6 +28,10 @@ import org.apache.logging.log4j.Logger;
import java.io.File;
import java.nio.file.Paths;
import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
* Periodically checks a folder (contains archived logs) and if it finds any
.log or .gz files, it will try to upload them to cloud storage by an upload
client (cloud specific)
@@ -41,6 +45,7 @@ public class CloudStorageUploader extends Thread {
private final String clusterName;
private final String hostName;
private final String uploaderType;
+ private final ExecutorService executorService;
public CloudStorageUploader(String name, UploadClient uploadClient,
LogFeederProps logFeederProps) {
super(name);
@@ -49,6 +54,7 @@ public class CloudStorageUploader extends Thread {
this.uploaderType = logFeederProps.getCloudStorageDestination().getText();
this.clusterName = logFeederProps.getClusterName();
this.hostName = LogFeederUtil.hostName;
+ this.executorService = Executors.newSingleThreadExecutor();
}
@Override
@@ -58,7 +64,7 @@ public class CloudStorageUploader extends Thread {
do {
try {
try {
- doUpload();
+ doUpload(logFeederProps.getCloudStorageUploaderTimeoutMinutes());
} catch (Exception e) {
logger.error("An error occurred during Uploader operation - " +
uploaderType, e);
}
@@ -73,7 +79,7 @@ public class CloudStorageUploader extends Thread {
/**
* Finds .log and .gz files and upload them to cloud storage by an uploader
client
*/
- void doUpload() {
+ void doUpload(int timeout) {
try {
final File archiveLogDir =
Paths.get(logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(),
uploaderType, clusterName, hostName, "archived").toFile();
@@ -88,7 +94,16 @@ public class CloudStorageUploader extends Thread {
String outputPath = String.format("%s/%s/%s/%s/%s", basePath,
clusterName, hostName, file.getParentFile().getName(), file.getName())
.replaceAll("//", "/");
logger.info("Upload will start: input: {}, output: {}",
file.getAbsolutePath(), outputPath);
- uploadClient.upload(file.getAbsolutePath(), outputPath);
+ Future<?> future = executorService.submit(() -> {
+ try {
+ uploadClient.upload(file.getAbsolutePath(), outputPath);
+ } catch (InterruptedException ie) {
+ logger.error("Cloud upload thread interrupted", ie);
+ } catch (Exception e) {
+ logger.error("Exception during cloud upload", e);
+ }
+ });
+ future.get(timeout, TimeUnit.MINUTES);
}
}
} else {
diff --git
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index 7e1b471..3d5ec8f 100644
---
a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++
b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* HDFS client that uses core-site.xml file from the classpath to load the
configuration.
* Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to
use one of those cloud storages
@@ -44,7 +46,7 @@ public class HDFSUploadClient implements UploadClient {
private final boolean externalHdfs;
private final HdfsOutputConfig hdfsOutputConfig;
private final FsPermission fsPermission;
- private FileSystem fs;
+ private final AtomicReference<Configuration> configurationRef = new
AtomicReference<>();
public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean
externalHdfs) {
this.hdfsOutputConfig = hdfsOutputConfig;
@@ -84,18 +86,18 @@ public class HDFSUploadClient implements UploadClient {
}
}
logger.info("HDFS client - will use '{}' permission for uploaded files",
hdfsOutputConfig.getHdfsFilePermissions());
- LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps, configuration);
- this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
+ configurationRef.set(configuration);
+ LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps,
configurationRef.get());
}
@Override
public void upload(String source, String target) throws Exception {
+ final FileSystem fs =
LogFeederHDFSUtil.buildFileSystem(configurationRef.get());
LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true,
this.fsPermission);
}
@Override
public void close() {
- LogFeederHDFSUtil.closeFileSystem(fs);
}
}