This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2bd65fb2df [Feature][Remote Logging] Add support for writing task logs
to OSS (#13332)
2bd65fb2df is described below
commit 2bd65fb2df68847130c7e1ab6616bbfba95783b2
Author: Rick Cheng <[email protected]>
AuthorDate: Mon Feb 13 16:38:26 2023 +0800
[Feature][Remote Logging] Add support for writing task logs to OSS (#13332)
---
docs/configs/docsdev.js | 8 ++
docs/docs/en/guide/remote-logging.md | 35 +++++
docs/docs/zh/guide/remote-logging.md | 35 +++++
.../common/constants/Constants.java | 23 ++++
.../common/log/remote/OssRemoteLogHandler.java | 143 +++++++++++++++++++++
.../log/remote/RemoteLogHandleThreadPool.java | 44 +++++++
.../common/log/remote/RemoteLogHandler.java | 25 ++++
.../common/log/remote/RemoteLogHandlerFactory.java | 39 ++++++
.../common/log/remote/RemoteLogService.java | 48 +++++++
.../common/log/remote/RemoteLogUtils.java | 79 ++++++++++++
.../src/main/resources/common.properties | 18 +++
.../common/log/remote/OssRemoteLogHandlerTest.java | 53 ++++++++
.../master/runner/WorkflowExecuteRunnable.java | 13 ++
.../remote/processor/LoggerRequestProcessor.java | 63 ++++++++-
.../plugin/task/api/AbstractCommandExecutor.java | 11 ++
.../plugin/task/api/TaskExecutionContext.java | 2 +
.../plugin/task/api/utils/LogUtils.java | 2 +-
.../worker/runner/WorkerTaskExecuteRunnable.java | 17 +++
18 files changed, 652 insertions(+), 6 deletions(-)
diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 6909cb8b97..22968f0c3a 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -406,6 +406,10 @@ export default {
title: 'Data Quality',
link:
'/en-us/docs/dev/user_doc/guide/data-quality.html',
},
+ {
+ title: 'Remote Logging',
+ link:
'/en-us/docs/dev/user_doc/guide/remote-logging.html',
+ },
{
title: 'Upgrade',
children: [
@@ -1046,6 +1050,10 @@ export default {
title: '数据质量',
link:
'/zh-cn/docs/dev/user_doc/guide/data-quality.html',
},
+ {
+ title: '远程日志存储',
+ link:
'/zh-cn/docs/dev/user_doc/guide/remote-logging.html',
+ },
{
title: '升级',
children: [
diff --git a/docs/docs/en/guide/remote-logging.md
b/docs/docs/en/guide/remote-logging.md
new file mode 100644
index 0000000000..add7a58485
--- /dev/null
+++ b/docs/docs/en/guide/remote-logging.md
@@ -0,0 +1,35 @@
+# Remote Logging
+
+Apache DolphinScheduler supports writing task logs to remote storage. When
remote logging is enabled, DolphinScheduler will send the task logs to the
specified remote storage asynchronously after the task ends. In addition, when
the user views or downloads the task log, if the log file does not exist
locally, DolphinScheduler will download the corresponding log file from the
remote storage to the local file system.
+
+## Enabling remote logging
+
+If you deploy DolphinScheduler in `Cluster` or `Pseudo-Cluster` mode, you need
to configure `api-server/conf/common.properties`,
`master-server/conf/common.properties` and
`worker-server/conf/common.properties`.
+If you deploy DolphinScheduler in `Standalone` mode, you only need to
configure `standalone-server/conf/common.properties` as follows:
+
+```properties
+# Whether to enable remote logging
+remote.logging.enable=false
+# if remote.logging.enable = true, set the target of remote logging
+remote.logging.target=OSS
+# if remote.logging.enable = true, set the log base directory
+remote.logging.base.dir=logs
+# if remote.logging.enable = true, set the number of threads to send logs to
remote storage
+remote.logging.thread.pool.size=10
+```
+
+## Writing task logs to [Aliyun Object Storage Service
(OSS)](https://www.aliyun.com/product/oss)
+
+Configure `common.properties` as follows:
+
+```properties
+# oss access key id, required if you set remote.logging.target=OSS
+remote.logging.oss.access.key.id=<access.key.id>
+# oss access key secret, required if you set remote.logging.target=OSS
+remote.logging.oss.access.key.secret=<access.key.secret>
+# oss bucket name, required if you set remote.logging.target=OSS
+remote.logging.oss.bucket.name=<bucket.name>
+# oss endpoint, required if you set remote.logging.target=OSS
+remote.logging.oss.endpoint=<endpoint>
+```
+
diff --git a/docs/docs/zh/guide/remote-logging.md
b/docs/docs/zh/guide/remote-logging.md
new file mode 100644
index 0000000000..0adde98e7a
--- /dev/null
+++ b/docs/docs/zh/guide/remote-logging.md
@@ -0,0 +1,35 @@
+# 远程日志存储(Remote Logging)
+
+Apache
DolphinScheduler支持将任务日志传输到远端存储上。当配置开启远程日志存储后,DolphinScheduler将在任务结束后,将对应的任务日志异步地发送到指定的远端存储上。此外,用户在查看或下载任务日志时,若本地没有该日志文件,DolphinScheduler将从远端存储上下载对应的日志文件到本地文件系统。
+
+## 开启远程日志存储
+
+如果您以 `集群` 模式或者 `伪集群`
模式部署DolphinScheduler,您需要对以下路径的文件进行配置:`api-server/conf/common.properties`,`master-server/conf/common.properties`和
`worker-server/conf/common.properties`;
+若您以 `单机` 模式部署DolphinScheduler,您只需要配置
`standalone-server/conf/common.properties`,具体配置如下:
+
+```properties
+# 是否开启远程日志存储
+remote.logging.enable=true
+# 任务日志写入的远端存储,目前仅支持OSS
+remote.logging.target=OSS
+# 任务日志在远端存储上的目录
+remote.logging.base.dir=logs
+# 设置向远端存储异步发送日志的线程池大小
+remote.logging.thread.pool.size=10
+```
+
+## 将任务日志写入[阿里云对象存储(OSS)](https://www.aliyun.com/product/oss)
+
+配置`common.propertis`如下:
+
+```properties
+# oss access key id, required if you set remote.logging.target=OSS
+remote.logging.oss.access.key.id=<access.key.id>
+# oss access key secret, required if you set remote.logging.target=OSS
+remote.logging.oss.access.key.secret=<access.key.secret>
+# oss bucket name, required if you set remote.logging.target=OSS
+remote.logging.oss.bucket.name=<bucket.name>
+# oss endpoint, required if you set remote.logging.target=OSS
+remote.logging.oss.endpoint=<endpoint>
+```
+
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index a304ab5a34..792746bddb 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -806,4 +806,27 @@ public final class Constants {
public static final Integer QUERY_ALL_ON_PROJECT = 1;
public static final Integer QUERY_ALL_ON_WORKFLOW = 2;
public static final Integer QUERY_ALL_ON_TASK = 3;
+
+ /**
+ * remote logging
+ */
+ public static final String REMOTE_LOGGING_ENABLE = "remote.logging.enable";
+
+ public static final String REMOTE_LOGGING_TARGET = "remote.logging.target";
+
+ public static final String REMOTE_LOGGING_BASE_DIR =
"remote.logging.base.dir";
+
+ public static final String REMOTE_LOGGING_THREAD_POOL_SIZE =
"remote.logging.thread.pool.size";
+
+ /**
+ * remote logging for OSS
+ */
+
+ public static final String REMOTE_LOGGING_OSS_ACCESS_KEY_ID =
"remote.logging.oss.access.key.id";
+
+ public static final String REMOTE_LOGGING_OSS_ACCESS_KEY_SECRET =
"remote.logging.oss.access.key.secret";
+
+ public static final String REMOTE_LOGGING_OSS_BUCKET_NAME =
"remote.logging.oss.bucket.name";
+
+ public static final String REMOTE_LOGGING_OSS_ENDPOINT =
"remote.logging.oss.endpoint";
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java
new file mode 100644
index 0000000000..a8d19af98c
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.log.remote;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.factory.OssClientFactory;
+import org.apache.dolphinscheduler.common.model.OssConnection;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.Bucket;
+import com.aliyun.oss.model.GetObjectRequest;
+import com.aliyun.oss.model.PutObjectRequest;
+
+@Slf4j
+public class OssRemoteLogHandler implements RemoteLogHandler, Closeable {
+
+ private static final int OBJECT_NAME_COUNT = 2;
+
+ private OSS ossClient;
+
+ private String bucketName;
+
+ public OssRemoteLogHandler() {
+ }
+
+ public void init() {
+ String accessKeyId = readOssAccessKeyId();
+ String accessKeySecret = readOssAccessKeySecret();
+ String endpoint = readOssEndpoint();
+ ossClient = OssClientFactory.buildOssClient(new
OssConnection(accessKeyId, accessKeySecret, endpoint));
+
+ bucketName = readOssBucketName();
+ checkBucketNameExists(bucketName);
+ }
+
+ @Override
+ public void sendRemoteLog(String logPath) {
+ String objectName = getObjectNameFromLogPath(logPath);
+
+ try {
+ log.info("send remote log {} to OSS {}", logPath, objectName);
+ PutObjectRequest putObjectRequest = new
PutObjectRequest(bucketName, objectName, new File(logPath));
+ ossClient.putObject(putObjectRequest);
+ } catch (Exception e) {
+ log.error("error while sending remote log {} to OSS {}", logPath,
objectName, e);
+ }
+ }
+
+ @Override
+ public void getRemoteLog(String logPath) {
+ String objectName = getObjectNameFromLogPath(logPath);
+
+ try {
+ log.info("get remote log on OSS {} to {}", objectName, logPath);
+ ossClient.getObject(new GetObjectRequest(bucketName, objectName),
new File(logPath));
+ } catch (Exception e) {
+ log.error("error while getting remote log on OSS {} to {}",
objectName, logPath, e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (ossClient != null) {
+ ossClient.shutdown();
+ }
+ }
+
+ private String getObjectNameFromLogPath(String logPath) {
+ Path path = Paths.get(logPath);
+ int nameCount = path.getNameCount();
+
+ if (nameCount < OBJECT_NAME_COUNT) {
+ return Paths.get(readOssBaseDir(), logPath).toString();
+ } else {
+ return Paths.get(readOssBaseDir(), path.subpath(nameCount -
OBJECT_NAME_COUNT, nameCount).toString())
+ .toString();
+ }
+ }
+
+ private void checkBucketNameExists(String bucketName) {
+ if (StringUtils.isBlank(bucketName)) {
+ throw new
IllegalArgumentException(Constants.REMOTE_LOGGING_OSS_BUCKET_NAME + " is
empty");
+ }
+
+ Bucket existsBucket = ossClient.listBuckets()
+ .stream()
+ .filter(
+ bucket -> bucket.getName().equals(bucketName))
+ .findFirst()
+ .orElseThrow(() -> {
+ return new IllegalArgumentException(
+ "bucketName: " + bucketName + " does not exist,
you need to create them by yourself");
+ });
+
+ log.info("bucketName: {} has been found", existsBucket.getName());
+ }
+
+ private String readOssAccessKeyId() {
+ return
PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_ACCESS_KEY_ID);
+ }
+
+ private String readOssAccessKeySecret() {
+ return
PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_ACCESS_KEY_SECRET);
+ }
+
+ private String readOssEndpoint() {
+ return PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_ENDPOINT);
+ }
+
+ private String readOssBucketName() {
+ return
PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_BUCKET_NAME);
+ }
+
+ private String readOssBaseDir() {
+ return PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR);
+ }
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandleThreadPool.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandleThreadPool.java
new file mode 100644
index 0000000000..96e743bb3f
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandleThreadPool.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.log.remote;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import java.util.concurrent.Executor;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+@EnableAsync
+public class RemoteLogHandleThreadPool {
+
+ @Bean
+ public Executor remoteLogHandleExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
executor.setCorePoolSize(PropertyUtils.getInt(Constants.REMOTE_LOGGING_THREAD_POOL_SIZE,
10));
+
executor.setMaxPoolSize(PropertyUtils.getInt(Constants.REMOTE_LOGGING_THREAD_POOL_SIZE,
10));
+ executor.setThreadNamePrefix("remote-logging-");
+ executor.initialize();
+
+ return executor;
+ }
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandler.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandler.java
new file mode 100644
index 0000000000..4626924304
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.log.remote;
+
+public interface RemoteLogHandler {
+
+ void sendRemoteLog(String logPath);
+
+ void getRemoteLog(String logPath);
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java
new file mode 100644
index 0000000000..a132db8d14
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.log.remote;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class RemoteLogHandlerFactory {
+
+ public RemoteLogHandler getRemoteLogHandler() {
+ if (!RemoteLogUtils.isRemoteLoggingEnable()) {
+ return null;
+ }
+ if
(!"OSS".equals(PropertyUtils.getUpperCaseString(Constants.REMOTE_LOGGING_TARGET)))
{
+ return null;
+ }
+ OssRemoteLogHandler ossRemoteLogHandler = new OssRemoteLogHandler();
+ ossRemoteLogHandler.init();
+ return ossRemoteLogHandler;
+ }
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java
new file mode 100644
index 0000000000..4097675eb1
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.log.remote;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class RemoteLogService {
+
+ @Async("remoteLogHandleExecutor")
+ public void asyncSendRemoteLog(String logPath) {
+ if (RemoteLogUtils.isRemoteLoggingEnable()) {
+ log.info("Start to send log {} to remote target {}", logPath,
+ PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));
+
+ RemoteLogHandler remoteLogHandler =
RemoteLogHandlerFactory.getRemoteLogHandler();
+ if (remoteLogHandler == null) {
+ log.error("remote log handler is null");
+ return;
+ }
+ remoteLogHandler.sendRemoteLog(logPath);
+ log.info("Succeed to send log {} to remote target {}", logPath,
+ PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));
+ }
+ }
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java
new file mode 100644
index 0000000000..9d81a1bfd1
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.log.remote;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import javax.annotation.PostConstruct;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class RemoteLogUtils {
+
+ private static RemoteLogService remoteLogService;
+
+ @Autowired
+ private RemoteLogService autowiredRemoteLogService;
+
+ @PostConstruct
+ private void init() {
+ remoteLogService = autowiredRemoteLogService;
+ }
+
+ public static void sendRemoteLog(String logPath) {
+ if (isRemoteLoggingEnable()) {
+ // send task logs to remote storage asynchronously
+ remoteLogService.asyncSendRemoteLog(logPath);
+ }
+ }
+
+ public static void getRemoteLog(String logPath) {
+ if (isRemoteLoggingEnable()) {
+ log.info("Start to get log {} from remote target {}", logPath,
+ PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));
+
+ mkdirOfLog(logPath);
+ RemoteLogHandler remoteLogHandler =
RemoteLogHandlerFactory.getRemoteLogHandler();
+ if (remoteLogHandler == null) {
+ log.error("remote log handler is null");
+ return;
+ }
+ remoteLogHandler.getRemoteLog(logPath);
+ log.info("End get log {} from remote target {}", logPath,
+ PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));
+ }
+ }
+
+ private static void mkdirOfLog(String logPath) {
+ Path directory = Paths.get(logPath).getParent();
+ directory.toFile().mkdirs();
+ }
+
+ public static boolean isRemoteLoggingEnable() {
+ return PropertyUtils.getBoolean(Constants.REMOTE_LOGGING_ENABLE,
Boolean.FALSE);
+ }
+}
diff --git a/dolphinscheduler-common/src/main/resources/common.properties
b/dolphinscheduler-common/src/main/resources/common.properties
index c3a196f41d..513fe53559 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -141,3 +141,21 @@ appId.collect=log
# The default env list will be load by Shell task, e.g.
/etc/profile,~/.bash_profile
shell.env_source_list=
+
+# Whether to enable remote logging
+remote.logging.enable=false
+# if remote.logging.enable = true, set the target of remote logging
+remote.logging.target=OSS
+# if remote.logging.enable = true, set the log base directory
+remote.logging.base.dir=logs
+# if remote.logging.enable = true, set the number of threads to send logs to
remote storage
+remote.logging.thread.pool.size=10
+# oss access key id, required if you set remote.logging.target=OSS
+remote.logging.oss.access.key.id=<access.key.id>
+# oss access key secret, required if you set remote.logging.target=OSS
+remote.logging.oss.access.key.secret=<access.key.secret>
+# oss bucket name, required if you set remote.logging.target=OSS
+remote.logging.oss.bucket.name=<bucket.name>
+# oss endpoint, required if you set remote.logging.target=OSS
+remote.logging.oss.endpoint=<endpoint>
+
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandlerTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandlerTest.java
new file mode 100644
index 0000000000..726a262938
--- /dev/null
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandlerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.log.remote;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import java.lang.reflect.Method;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class OssRemoteLogHandlerTest {
+
+ @Test
+ public void testGetObjectNameFromLogPath() throws Exception {
+ final String logPath =
"/path/to/dolphinscheduler/logs/20230116/8245922982496_1-1-3.log";
+ final String expectedObjectName =
"logs/20230116/8245922982496_1-1-3.log";
+
+ OssRemoteLogHandler ossRemoteLogHandler = new OssRemoteLogHandler();
+
+ try (MockedStatic<PropertyUtils> propertyUtilsMockedStatic =
Mockito.mockStatic(PropertyUtils.class)) {
+ propertyUtilsMockedStatic.when(() ->
PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR))
+ .thenReturn("logs");
+
+ Method method =
OssRemoteLogHandler.class.getDeclaredMethod("getObjectNameFromLogPath",
String.class);
+ method.setAccessible(true);
+ String objectName = (String) method.invoke(ossRemoteLogHandler,
logPath);
+
+ Assertions.assertEquals(expectedObjectName, objectName);
+ }
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 5ebbf8c8ff..3596bcae5a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -42,6 +42,7 @@ import
org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
@@ -460,6 +461,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
taskInstance.getTaskCode(),
taskInstance.getState());
this.updateProcessInstanceState();
+
+ sendTaskLogOnMasterToRemoteIfNeeded(taskInstance.getLogPath(),
taskInstance.getHost());
} catch (Exception ex) {
log.error("Task finish failed, get a exception, will remove this
taskInstance from completeTaskMap", ex);
// remove the task from complete map, so that we can finish in the
next time.
@@ -2178,4 +2181,14 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
+ private void sendTaskLogOnMasterToRemoteIfNeeded(String logPath, String
host) {
+ if (RemoteLogUtils.isRemoteLoggingEnable() &&
isExecutedOnMaster(host)) {
+ RemoteLogUtils.sendRemoteLog(logPath);
+ log.info("Master sends task log {} to remote storage
asynchronously.", logPath);
+ }
+ }
+
+ private boolean isExecutedOnMaster(String host) {
+ return host.endsWith(masterAddress.split(Constants.COLON)[1]);
+ }
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java
index 38807bf18a..6259998146 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.remote.processor;
import static
org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -81,7 +82,7 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject(
command.getBody(), ViewLogRequestCommand.class);
String viewLogPath = viewLogRequest.getPath();
- String msg = LogUtils.readWholeFileContent(viewLogPath);
+ String msg = readWholeFileContent(viewLogPath);
ViewLogResponseCommand viewLogResponse = new
ViewLogResponseCommand(msg);
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break;
@@ -156,7 +157,7 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
* @param filePath file path
* @return byte array of file
*/
- private byte[] getFileContentBytes(String filePath) {
+ private byte[] getFileContentBytesFromLocal(String filePath) {
try (
InputStream in = new FileInputStream(filePath);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
@@ -172,6 +173,22 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
return new byte[0];
}
+ private byte[] getFileContentBytesFromRemote(String filePath) {
+ RemoteLogUtils.getRemoteLog(filePath);
+ return getFileContentBytesFromLocal(filePath);
+ }
+
+ private byte[] getFileContentBytes(String filePath) {
+ File file = new File(filePath);
+ if (file.exists()) {
+ return getFileContentBytesFromLocal(filePath);
+ }
+ if (RemoteLogUtils.isRemoteLoggingEnable()) {
+ return getFileContentBytesFromRemote(filePath);
+ }
+ return getFileContentBytesFromLocal(filePath);
+ }
+
/**
* read part file content,can skip any line and read some lines
*
@@ -180,9 +197,9 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
* @param limit read lines limit
* @return part file content
*/
- private List<String> readPartFileContent(String filePath,
- int skipLine,
- int limit) {
+ private List<String> readPartFileContentFromLocal(String filePath,
+ int skipLine,
+ int limit) {
File file = new File(filePath);
if (file.exists() && file.isFile()) {
try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
@@ -196,4 +213,40 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
return Collections.emptyList();
}
+ private List<String> readPartFileContentFromRemote(String filePath,
+ int skipLine,
+ int limit) {
+ RemoteLogUtils.getRemoteLog(filePath);
+ return readPartFileContentFromLocal(filePath, skipLine, limit);
+ }
+
+ private List<String> readPartFileContent(String filePath,
+ int skipLine,
+ int limit) {
+ File file = new File(filePath);
+ if (file.exists()) {
+ return readPartFileContentFromLocal(filePath, skipLine, limit);
+ }
+ if (RemoteLogUtils.isRemoteLoggingEnable()) {
+ return readPartFileContentFromRemote(filePath, skipLine, limit);
+ }
+ return readPartFileContentFromLocal(filePath, skipLine, limit);
+ }
+
+ private String readWholeFileContentFromRemote(String filePath) {
+ RemoteLogUtils.getRemoteLog(filePath);
+ return LogUtils.readWholeFileContentFromLocal(filePath);
+ }
+
+ private String readWholeFileContent(String filePath) {
+ File file = new File(filePath);
+ if (file.exists()) {
+ return LogUtils.readWholeFileContentFromLocal(filePath);
+ }
+ if (RemoteLogUtils.isRemoteLoggingEnable()) {
+ return readWholeFileContentFromRemote(filePath);
+ }
+ return LogUtils.readWholeFileContentFromLocal(filePath);
+ }
+
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 201684ddca..faceab2b3d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -21,6 +21,7 @@ import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.getPidsStr;
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import
org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
@@ -100,6 +101,11 @@ public abstract class AbstractCommandExecutor {
this.taskRequest = taskRequest;
this.logger = logger;
this.logBuffer = new LinkedBlockingQueue<>();
+
+ if (this.taskRequest != null) {
+ // set logBufferEnable=true if the task uses logHandler and
logBuffer to buffer log messages
+ this.taskRequest.setLogBufferEnable(true);
+ }
}
public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
@@ -335,6 +341,11 @@ public abstract class AbstractCommandExecutor {
logBuffer.clear();
}
logHandler.accept(markerLog);
+
+ if (RemoteLogUtils.isRemoteLoggingEnable()) {
+ RemoteLogUtils.sendRemoteLog(taskRequest.getLogPath());
+ logger.info("Log handler sends task log {} to remote storage
asynchronously.", taskRequest.getLogPath());
+ }
}
/**
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 4e02f954ed..2a676755fc 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -268,4 +268,6 @@ public class TaskExecutionContext implements Serializable {
* test flag
*/
private int testFlag;
+
+ private boolean logBufferEnable;
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
index 16849d3837..9bff3b66c1 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
@@ -164,7 +164,7 @@ public class LogUtils {
}
}
- public static String readWholeFileContent(String filePath) {
+ public static String readWholeFileContentFromLocal(String filePath) {
String line;
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(new
FileInputStream(filePath)))) {
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
index 631f7298d3..103e253800 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -23,6 +23,7 @@ import static
org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLA
import static
org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH;
import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@@ -120,6 +121,8 @@ public abstract class WorkerTaskExecuteRunnable implements
Runnable {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
log.info("Remove the current task execute context from worker cache");
clearTaskExecPathIfNeeded();
+
+ sendTaskLogOnWorkerToRemoteIfNeeded();
}
protected void afterThrowing(Throwable throwable) throws TaskException {
@@ -131,6 +134,8 @@ public abstract class WorkerTaskExecuteRunnable implements
Runnable {
log.info(
"Get a exception when execute the task, will send the task
execute result to master, the current task execute result is {}",
TaskExecutionStatus.FAILURE);
+
+ sendTaskLogOnWorkerToRemoteIfNeeded();
}
public void cancelTask() {
@@ -279,6 +284,18 @@ public abstract class WorkerTaskExecuteRunnable implements
Runnable {
taskExecutionContext.getCurrentExecutionStatus());
}
+ protected void sendTaskLogOnWorkerToRemoteIfNeeded() {
+ if (taskExecutionContext.isLogBufferEnable()) {
+ return;
+ }
+
+ if (RemoteLogUtils.isRemoteLoggingEnable()) {
+ RemoteLogUtils.sendRemoteLog(taskExecutionContext.getLogPath());
+ log.info("Worker sends task log {} to remote storage
asynchronously.",
+ taskExecutionContext.getLogPath());
+ }
+ }
+
protected void clearTaskExecPathIfNeeded() {
String execLocalPath = taskExecutionContext.getExecutePath();
if (!CommonUtils.isDevelopMode()) {