This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 43f5f24529 Add dolphinscheduler-extract-common module (#15266)
43f5f24529 is described below
commit 43f5f24529508bc36fef005c551cb0e06d0d75ca
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Dec 5 21:06:39 2023 +0800
Add dolphinscheduler-extract-common module (#15266)
---
.../api/service/impl/LoggerServiceImpl.java | 130 ++++++---------------
.../api/service/impl/TaskInstanceServiceImpl.java | 19 +--
.../api/service/LoggerServiceTest.java | 116 +++++++++++-------
dolphinscheduler-extract/README.md | 21 ++++
.../pom.xml | 10 +-
.../extract/common/ILogService.java} | 17 +--
.../common}/transportor/GetAppIdRequest.java | 4 +-
.../common}/transportor/GetAppIdResponse.java | 4 +-
.../TaskInstanceLogFileDownloadRequest.java | 3 +-
.../TaskInstanceLogFileDownloadResponse.java | 3 +-
.../TaskInstanceLogPageQueryRequest.java | 9 +-
.../TaskInstanceLogPageQueryResponse.java | 2 +-
.../dolphinscheduler-extract-master/pom.xml | 5 +
.../extract/master/IMasterLogService.java | 38 ------
.../LogicTaskInstanceLogFileDownloadRequest.java | 32 -----
.../LogicTaskInstanceLogFileDownloadResponse.java | 30 -----
.../LogicTaskInstanceLogPageQueryRequest.java | 35 ------
.../LogicTaskInstanceLogPageQueryResponse.java | 31 -----
.../dolphinscheduler-extract-worker/pom.xml | 5 +
dolphinscheduler-extract/pom.xml | 1 +
.../server/master/rpc/MasterLogServiceImpl.java | 41 +++----
.../master/service/WorkerFailoverService.java | 12 +-
.../service/process/ProcessServiceImpl.java | 16 +--
.../server/worker/rpc/WorkerLogServiceImpl.java | 37 +++---
24 files changed, 218 insertions(+), 403 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index 24dc41ce3a..e9222c715e 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -26,8 +26,6 @@ import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
-import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -37,25 +35,15 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
-import org.apache.dolphinscheduler.extract.master.IMasterLogService;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse;
-import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse;
-import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
+import org.apache.dolphinscheduler.extract.common.ILogService;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
-import java.io.File;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
@@ -108,8 +96,7 @@ public class LoggerServiceImpl extends BaseServiceImpl
implements LoggerService
log.error("Host of task instance is null, taskInstanceId:{}.",
taskInstId);
return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
}
- Project project =
projectMapper.queryProjectByTaskInstanceId(taskInstId);
- projectService.checkProjectAndAuthThrowException(loginUser, project,
VIEW_LOG);
+ projectService.checkProjectAndAuthThrowException(loginUser,
taskInstance.getProjectCode(), VIEW_LOG);
Result<ResponseTaskLog> result = new
Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance, skipLineNum, limit);
int lineNum = log.split("\\r\\n").length;
@@ -199,7 +186,6 @@ public class LoggerServiceImpl extends BaseServiceImpl
implements LoggerService
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int
limit) {
final String logPath = taskInstance.getLogPath();
- final String host = taskInstance.getHost();
log.info("Query task instance log, taskInstanceId:{},
taskInstanceName:{}, host: {}, logPath:{}",
taskInstance.getId(), taskInstance.getName(),
taskInstance.getHost(), logPath);
StringBuilder sb = new StringBuilder();
@@ -211,48 +197,24 @@ public class LoggerServiceImpl extends BaseServiceImpl
implements LoggerService
sb.append(head);
}
- String logContent = null;
- if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
- IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(taskInstance.getHost(),
IMasterLogService.class);
- try {
- LogicTaskInstanceLogPageQueryRequest
logicTaskInstanceLogPageQueryRequest =
- new
LogicTaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath,
skipLineNum, limit);
- LogicTaskInstanceLogPageQueryResponse
logicTaskInstanceLogPageQueryResponse =
-
masterLogService.pageQueryLogicTaskInstanceLog(logicTaskInstanceLogPageQueryRequest);
- logContent =
logicTaskInstanceLogPageQueryResponse.getLogContent();
- } catch (Exception ex) {
- log.error("Query LogicTaskInstance log error", ex);
- }
- } else {
- IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(host, IWorkerLogService.class);
- try {
- TaskInstanceLogPageQueryRequest
taskInstanceLogPageQueryRequest =
- new
TaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum,
limit);
- TaskInstanceLogPageQueryResponse
taskInstanceLogPageQueryResponse =
-
iWorkerLogService.pageQueryTaskInstanceLog(taskInstanceLogPageQueryRequest);
- logContent = taskInstanceLogPageQueryResponse.getLogContent();
- } catch (Exception ex) {
- log.error("Query LogicTaskInstance log error", ex);
+ ILogService iLogService =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
ILogService.class);
+ try {
+ TaskInstanceLogPageQueryRequest request =
TaskInstanceLogPageQueryRequest.builder()
+ .taskInstanceId(taskInstance.getId())
+ .taskInstanceLogAbsolutePath(logPath)
+ .skipLineNum(skipLineNum)
+ .limit(limit)
+ .build();
+ TaskInstanceLogPageQueryResponse response =
iLogService.pageQueryTaskInstanceLog(request);
+ String logContent = response.getLogContent();
+ if (logContent != null) {
+ sb.append(logContent);
}
+ return sb.toString();
+ } catch (Throwable ex) {
+ throw new ServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR,
ex);
}
- if (logContent == null && RemoteLogUtils.isRemoteLoggingEnable()) {
- // When getting the log for the first time (skipLineNum=0) returns
empty, get the log from remote target
- try {
- log.info("Get log {} from remote target", logPath);
- RemoteLogUtils.getRemoteLog(logPath);
- List<String> lines =
LogUtils.readPartFileContentFromLocal(logPath, skipLineNum, limit);
- logContent = LogUtils.rollViewLogLines(lines);
- FileUtils.delete(new File(logPath));
- } catch (IOException e) {
- log.error("Error while getting log from remote target", e);
- }
- }
- if (logContent != null) {
- sb.append(logContent);
- }
- return sb.toString();
}
/**
@@ -271,45 +233,19 @@ public class LoggerServiceImpl extends BaseServiceImpl
implements LoggerService
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);
byte[] logBytes = new byte[0];
- if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
- IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(taskInstance.getHost(),
IMasterLogService.class);
- try {
- LogicTaskInstanceLogFileDownloadRequest
logicTaskInstanceLogFileDownloadRequest =
- new
LogicTaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
- LogicTaskInstanceLogFileDownloadResponse
logicTaskInstanceLogFileDownloadResponse =
-
masterLogService.getLogicTaskInstanceWholeLogFileBytes(logicTaskInstanceLogFileDownloadRequest);
- logBytes =
logicTaskInstanceLogFileDownloadResponse.getLogBytes();
- } catch (Exception ex) {
- log.error("Query LogicTaskInstance log error", ex);
- }
- } else {
- IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(host, IWorkerLogService.class);
- try {
- TaskInstanceLogFileDownloadRequest
taskInstanceLogFileDownloadRequest =
- new
TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
- TaskInstanceLogFileDownloadResponse
taskInstanceWholeLogFileBytes =
-
iWorkerLogService.getTaskInstanceWholeLogFileBytes(taskInstanceLogFileDownloadRequest);
- logBytes = taskInstanceWholeLogFileBytes.getLogBytes();
- } catch (Exception ex) {
- log.error("Query LogicTaskInstance log error", ex);
- }
- }
- if ((logBytes == null || logBytes.length == 0) &&
RemoteLogUtils.isRemoteLoggingEnable()) {
- // get task log from remote target
- try {
- log.info("Get log {} from remote target", logPath);
- RemoteLogUtils.getRemoteLog(logPath);
- File logFile = new File(logPath);
- logBytes = FileUtils.readFileToByteArray(logFile);
- FileUtils.delete(logFile);
- } catch (IOException e) {
- log.error("Error while getting log from remote target", e);
- }
+ ILogService iLogService =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
ILogService.class);
+ try {
+ TaskInstanceLogFileDownloadRequest request =
+ new
TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
+ TaskInstanceLogFileDownloadResponse response =
iLogService.getTaskInstanceWholeLogFileBytes(request);
+ logBytes = response.getLogBytes();
+ return Bytes.concat(head, logBytes);
+ } catch (Exception ex) {
+ log.error("Download TaskInstance: {} Log Error",
taskInstance.getName(), ex);
+ throw new
ServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR);
}
- return Bytes.concat(head, logBytes);
}
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 026b54e701..330374779f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -44,16 +44,14 @@ import
org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
-import org.apache.dolphinscheduler.extract.master.IMasterLogService;
+import org.apache.dolphinscheduler.extract.common.ILogService;
import
org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
-import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
@@ -381,18 +379,9 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
return;
}
for (TaskInstance taskInstance : needToDeleteTaskInstances) {
- // delete log
- if (StringUtils.isNotEmpty(taskInstance.getLogPath())) {
- if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
- IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(taskInstance.getHost(),
IMasterLogService.class);
-
masterLogService.removeLogicTaskInstanceLog(taskInstance.getLogPath());
- } else {
- IWorkerLogService workerLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(taskInstance.getHost(),
IWorkerLogService.class);
-
workerLogService.removeTaskInstanceLog(taskInstance.getLogPath());
- }
- }
+ ILogService iLogService =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
ILogService.class);
+ iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
}
dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
index cd95511130..1eb7a2575b 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
@@ -21,8 +21,10 @@ import static
org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThro
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
+import org.apache.dolphinscheduler.api.AssertionsHelper;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
@@ -36,26 +38,34 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
+import org.apache.dolphinscheduler.extract.common.ILogService;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * logger service test
- */
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class LoggerServiceTest {
@@ -77,6 +87,45 @@ public class LoggerServiceTest {
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
+ private NettyRemotingServer nettyRemotingServer;
+
+ @BeforeEach
+ public void setUp() {
+ nettyRemotingServer = new NettyRemotingServer(new
NettyServerConfig(8080));
+ nettyRemotingServer.start();
+ SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
+ new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
+ springServerMethodInvokerDiscovery.postProcessAfterInitialization(new
ILogService() {
+
+ @Override
+ public TaskInstanceLogFileDownloadResponse
getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest
taskInstanceLogFileDownloadRequest) {
+ return new TaskInstanceLogFileDownloadResponse(new byte[0]);
+ }
+
+ @Override
+ public TaskInstanceLogPageQueryResponse
pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest
taskInstanceLogPageQueryRequest) {
+ return new TaskInstanceLogPageQueryResponse();
+ }
+
+ @Override
+ public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
+ return new GetAppIdResponse();
+ }
+
+ @Override
+ public void removeTaskInstanceLog(String
taskInstanceLogAbsolutePath) {
+
+ }
+ }, "iLogServiceImpl");
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (nettyRemotingServer != null) {
+ nettyRemotingServer.close();
+ }
+ }
+
@Test
public void testQueryLog() {
@@ -101,27 +150,20 @@ public class LoggerServiceTest {
// PROJECT_NOT_EXIST
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
- Project project = getProject(1);
-
Mockito.when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project);
- try {
- Mockito.doThrow(new
ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
- .checkProjectAndAuthThrowException(loginUser, project,
VIEW_LOG);
- loggerService.queryLog(loginUser, 1, 1, 1);
- } catch (ServiceException serviceException) {
- Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(),
serviceException.getCode());
- }
+ doThrow(new
ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
+ .checkProjectAndAuthThrowException(loginUser,
taskInstance.getProjectCode(), VIEW_LOG);
+ AssertionsHelper.assertThrowsServiceException(Status.PROJECT_NOT_EXIST,
+ () -> loggerService.queryLog(loginUser, 1, 1, 1));
// USER_NO_OPERATION_PERM
- try {
- Mockito.doThrow(new
ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService)
- .checkProjectAndAuthThrowException(loginUser, project,
VIEW_LOG);
- loggerService.queryLog(loginUser, 1, 1, 1);
- } catch (ServiceException serviceException) {
- Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(),
serviceException.getCode());
- }
+ doThrow(new
ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService)
+ .checkProjectAndAuthThrowException(loginUser,
taskInstance.getProjectCode(), VIEW_LOG);
+
AssertionsHelper.assertThrowsServiceException(Status.USER_NO_OPERATION_PERM,
+ () -> loggerService.queryLog(loginUser, 1, 1, 1));
// SUCCESS
-
doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser,
project, VIEW_LOG);
+
doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser,
taskInstance.getProjectCode(),
+ VIEW_LOG);
when(taskInstanceDao.queryById(1)).thenReturn(taskInstance);
result = loggerService.queryLog(loginUser, 1, 1, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(),
result.getCode().intValue());
@@ -158,30 +200,22 @@ public class LoggerServiceTest {
// PROJECT_NOT_EXIST
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
- try {
- Mockito.doThrow(new
ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
- .checkProjectAndAuthThrowException(loginUser,
taskInstance.getProjectCode(), DOWNLOAD_LOG);
- loggerService.queryLog(loginUser, 1, 1, 1);
- } catch (ServiceException serviceException) {
- Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(),
serviceException.getCode());
- }
+ doThrow(new
ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
+ .checkProjectAndAuthThrowException(loginUser,
taskInstance.getProjectCode(), VIEW_LOG);
+ AssertionsHelper.assertThrowsServiceException(Status.PROJECT_NOT_EXIST,
+ () -> loggerService.queryLog(loginUser, 1, 1, 1));
// USER_NO_OPERATION_PERM
- Project project = getProject(1);
-
when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project);
- try {
- Mockito.doThrow(new
ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService)
- .checkProjectAndAuthThrowException(loginUser, project,
DOWNLOAD_LOG);
- loggerService.queryLog(loginUser, 1, 1, 1);
- } catch (ServiceException serviceException) {
- Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(),
serviceException.getCode());
- }
+ doThrow(new
ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService)
+ .checkProjectAndAuthThrowException(loginUser,
taskInstance.getProjectCode(), VIEW_LOG);
+
AssertionsHelper.assertThrowsServiceException(Status.USER_NO_OPERATION_PERM,
+ () -> loggerService.queryLog(loginUser, 1, 1, 1));
// SUCCESS
-
doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser,
project, DOWNLOAD_LOG);
-
when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project);
- byte[] result = loggerService.getLogBytes(loginUser, 1);
- Assertions.assertEquals(47, result.length);
+
doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser,
taskInstance.getProjectCode(),
+ DOWNLOAD_LOG);
+ byte[] logBytes = loggerService.getLogBytes(loginUser, 1);
+ Assertions.assertEquals(47, logBytes.length);
}
@Test
diff --git a/dolphinscheduler-extract/README.md
b/dolphinscheduler-extract/README.md
new file mode 100644
index 0000000000..0bf173d560
--- /dev/null
+++ b/dolphinscheduler-extract/README.md
@@ -0,0 +1,21 @@
+# Introduction
+
+This module contains the RPC interface which can be used to communicate with
the DolphinScheduler server.
+
+# [dolphinscheduler-extract-base](dolphinscheduler-extract-base)
+
+The base module contains the basic interfaces for how to define the RPC client
and server.
+
+# [dolphinscheduler-extract-common](dolphinscheduler-extract-common)
+
+The common module contains the common interface which can be used by both the
master and worker.
+
+# [dolphinscheduler-extract-master](dolphinscheduler-extract-master)
+
+This module contains the RPC interface which can be used by communicate with
the master server.
+
+# [dolphinscheduler-extract-worker](dolphinscheduler-extract-worker)
+This module contains the RPC interface which can be used by communicate with
the worker server.
+
+# [dolphinscheduler-extract-alert](dolphinscheduler-extract-alert)
+This module contains the RPC interface which can be used by communicate with
the alert server.
diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml
b/dolphinscheduler-extract/dolphinscheduler-extract-common/pom.xml
similarity index 86%
copy from dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml
copy to dolphinscheduler-extract/dolphinscheduler-extract-common/pom.xml
index da5fbd3b3f..d7d339ff9f 100644
--- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml
+++ b/dolphinscheduler-extract/dolphinscheduler-extract-common/pom.xml
@@ -27,19 +27,15 @@
<version>dev-SNAPSHOT</version>
</parent>
- <artifactId>dolphinscheduler-extract-worker</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-extract-common</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
+ <version>${project.version}</version>
</dependency>
-
- <dependency>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-task-api</artifactId>
- </dependency>
-
</dependencies>
</project>
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/IWorkerLogService.java
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java
similarity index 76%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/IWorkerLogService.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java
index d03191888a..c602f15bb2 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/IWorkerLogService.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/ILogService.java
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.worker;
+package org.apache.dolphinscheduler.extract.common;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
-import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdRequest;
-import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdResponse;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
@RpcService
-public interface IWorkerLogService {
+public interface ILogService {
@RpcMethod
TaskInstanceLogFileDownloadResponse
getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest
taskInstanceLogFileDownloadRequest);
@@ -40,4 +40,5 @@ public interface IWorkerLogService {
@RpcMethod
void removeTaskInstanceLog(String taskInstanceLogAbsolutePath);
+
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdRequest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdRequest.java
similarity index 91%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdRequest.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdRequest.java
index 1148a6f24c..d6f0077b63 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdRequest.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdRequest.java
@@ -15,13 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.worker.transportor;
+package org.apache.dolphinscheduler.extract.common.transportor;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
+@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GetAppIdRequest {
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdResponse.java
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdResponse.java
similarity index 94%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdResponse.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdResponse.java
index 3b95a4d66e..db3fc2af89 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/GetAppIdResponse.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/GetAppIdResponse.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.worker.transportor;
+package org.apache.dolphinscheduler.extract.common.transportor;
import java.util.List;
@@ -24,8 +24,8 @@ import lombok.Data;
import lombok.NoArgsConstructor;
@Data
-@NoArgsConstructor
@AllArgsConstructor
+@NoArgsConstructor
public class GetAppIdResponse {
private List<String> appIds;
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadRequest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadRequest.java
similarity index 94%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadRequest.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadRequest.java
index 1fd3cf2167..dcb8c1da4a 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadRequest.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadRequest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.worker.transportor;
+package org.apache.dolphinscheduler.extract.common.transportor;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -29,4 +29,5 @@ public class TaskInstanceLogFileDownloadRequest {
private long taskInstanceId;
private String taskInstanceLogAbsolutePath;
+
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadResponse.java
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadResponse.java
similarity index 94%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadResponse.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadResponse.java
index ee805a35b5..ac1e5b0cfd 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogFileDownloadResponse.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogFileDownloadResponse.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.worker.transportor;
+package org.apache.dolphinscheduler.extract.common.transportor;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -27,4 +27,5 @@ import lombok.NoArgsConstructor;
public class TaskInstanceLogFileDownloadResponse {
private byte[] logBytes;
+
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryRequest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryRequest.java
similarity index 89%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryRequest.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryRequest.java
index 321353e5d3..2723ac5baa 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryRequest.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryRequest.java
@@ -15,21 +15,24 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.worker.transportor;
+package org.apache.dolphinscheduler.extract.common.transportor;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
-@NoArgsConstructor
+@Builder
@AllArgsConstructor
+@NoArgsConstructor
public class TaskInstanceLogPageQueryRequest {
- private int taskInstanceId;
+ private Integer taskInstanceId;
private String taskInstanceLogAbsolutePath;
private int skipLineNum;
private int limit;
+
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryResponse.java
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryResponse.java
similarity index 94%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryResponse.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryResponse.java
index 1e0d481ddd..ac769d7677 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-worker/src/main/java/org/apache/dolphinscheduler/extract/worker/transportor/TaskInstanceLogPageQueryResponse.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-common/src/main/java/org/apache/dolphinscheduler/extract/common/transportor/TaskInstanceLogPageQueryResponse.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.worker.transportor;
+package org.apache.dolphinscheduler.extract.common.transportor;
import lombok.AllArgsConstructor;
import lombok.Data;
diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml
b/dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml
index 04afc0b1de..955bcd0a12 100644
--- a/dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml
+++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml
@@ -30,6 +30,11 @@
<artifactId>dolphinscheduler-extract-master</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-extract-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterLogService.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterLogService.java
deleted file mode 100644
index a7f15b2ea8..0000000000
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterLogService.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.master;
-
-import org.apache.dolphinscheduler.extract.base.RpcMethod;
-import org.apache.dolphinscheduler.extract.base.RpcService;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse;
-
-@RpcService
-public interface IMasterLogService {
-
- @RpcMethod
- LogicTaskInstanceLogFileDownloadResponse
getLogicTaskInstanceWholeLogFileBytes(LogicTaskInstanceLogFileDownloadRequest
logicTaskInstanceLogFileDownloadRequest);
-
- @RpcMethod
- LogicTaskInstanceLogPageQueryResponse
pageQueryLogicTaskInstanceLog(LogicTaskInstanceLogPageQueryRequest
taskInstanceLogPageQueryRequest);
-
- @RpcMethod
- void removeLogicTaskInstanceLog(String taskInstanceLogAbsolutePath);
-}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadRequest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadRequest.java
deleted file mode 100644
index 7fe4c5df24..0000000000
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadRequest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.master.transportor;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class LogicTaskInstanceLogFileDownloadRequest {
-
- private long taskInstanceId;
-
- private String taskInstanceLogAbsolutePath;
-}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadResponse.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadResponse.java
deleted file mode 100644
index fe80526e55..0000000000
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogFileDownloadResponse.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.master.transportor;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class LogicTaskInstanceLogFileDownloadResponse {
-
- private byte[] logBytes;
-}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryRequest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryRequest.java
deleted file mode 100644
index 5a8d3da808..0000000000
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryRequest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.master.transportor;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class LogicTaskInstanceLogPageQueryRequest {
-
- private long taskInstanceId;
-
- private String taskInstanceLogAbsolutePath;
-
- private int skipLineNum;
- private int limit;
-}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryResponse.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryResponse.java
deleted file mode 100644
index 1d4a2b6b55..0000000000
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskInstanceLogPageQueryResponse.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.extract.master.transportor;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class LogicTaskInstanceLogPageQueryResponse {
-
- private String logContent;
-
-}
diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml
b/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml
index da5fbd3b3f..0a42b73a1d 100644
--- a/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml
+++ b/dolphinscheduler-extract/dolphinscheduler-extract-worker/pom.xml
@@ -30,6 +30,11 @@
<artifactId>dolphinscheduler-extract-worker</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-extract-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
diff --git a/dolphinscheduler-extract/pom.xml b/dolphinscheduler-extract/pom.xml
index dca4fd6623..09be31ebcc 100644
--- a/dolphinscheduler-extract/pom.xml
+++ b/dolphinscheduler-extract/pom.xml
@@ -34,6 +34,7 @@
<module>dolphinscheduler-extract-master</module>
<module>dolphinscheduler-extract-worker</module>
<module>dolphinscheduler-extract-alert</module>
+ <module>dolphinscheduler-extract-common</module>
</modules>
</project>
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterLogServiceImpl.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterLogServiceImpl.java
index 6d908d16ac..0fa7cbffd9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterLogServiceImpl.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterLogServiceImpl.java
@@ -17,14 +17,17 @@
package org.apache.dolphinscheduler.server.master.rpc;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.LogUtils;
-import org.apache.dolphinscheduler.extract.master.IMasterLogService;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest;
-import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse;
+import org.apache.dolphinscheduler.extract.common.ILogService;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
-import java.io.File;
+import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@@ -33,18 +36,18 @@ import org.springframework.stereotype.Service;
@Slf4j
@Service
-public class MasterLogServiceImpl implements IMasterLogService {
+public class MasterLogServiceImpl implements ILogService {
@Override
- public LogicTaskInstanceLogFileDownloadResponse
getLogicTaskInstanceWholeLogFileBytes(LogicTaskInstanceLogFileDownloadRequest
logicTaskInstanceLogFileDownloadRequest) {
+ public TaskInstanceLogFileDownloadResponse
getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest
logicTaskInstanceLogFileDownloadRequest) {
byte[] bytes =
LogUtils.getFileContentBytes(logicTaskInstanceLogFileDownloadRequest.getTaskInstanceLogAbsolutePath());
// todo: if file not exists, return error result
- return new LogicTaskInstanceLogFileDownloadResponse(bytes);
+ return new TaskInstanceLogFileDownloadResponse(bytes);
}
@Override
- public LogicTaskInstanceLogPageQueryResponse
pageQueryLogicTaskInstanceLog(LogicTaskInstanceLogPageQueryRequest
taskInstanceLogPageQueryRequest) {
+ public TaskInstanceLogPageQueryResponse
pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest
taskInstanceLogPageQueryRequest) {
List<String> lines = LogUtils.readPartFileContent(
taskInstanceLogPageQueryRequest.getTaskInstanceLogAbsolutePath(),
@@ -52,18 +55,16 @@ public class MasterLogServiceImpl implements
IMasterLogService {
taskInstanceLogPageQueryRequest.getLimit());
String logContent = LogUtils.rollViewLogLines(lines);
- return new LogicTaskInstanceLogPageQueryResponse(logContent);
+ return new TaskInstanceLogPageQueryResponse(logContent);
}
@Override
- public void removeLogicTaskInstanceLog(String taskInstanceLogAbsolutePath)
{
- File taskLogFile = new File(taskInstanceLogAbsolutePath);
- try {
- if (taskLogFile.exists()) {
- taskLogFile.delete();
- }
- } catch (Exception e) {
- log.error("Remove LogicTaskInstanceLog error", e);
- }
+ public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
+ return new GetAppIdResponse(Collections.emptyList());
+ }
+
+ @Override
+ public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {
+ FileUtils.deleteFile(taskInstanceLogAbsolutePath);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 0dc1b16679..86515d032d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -25,9 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
-import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
-import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdRequest;
-import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdResponse;
+import org.apache.dolphinscheduler.extract.common.ILogService;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -274,10 +274,10 @@ public class WorkerFailoverService {
.create();
// only kill yarn/k8s job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn
or k8s job");
- IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(taskInstance.getHost(),
IWorkerLogService.class);
+ ILogService iLogService =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
ILogService.class);
GetAppIdResponse getAppIdResponse =
- iWorkerLogService.getAppId(new
GetAppIdRequest(taskInstance.getId(), taskInstance.getLogPath()));
+ iLogService.getAppId(new
GetAppIdRequest(taskInstance.getId(), taskInstance.getLogPath()));
ProcessUtils.killApplication(getAppIdResponse.getAppIds(),
taskExecutionContext);
} catch (Exception ex) {
log.error("Kill yarn task error", ex);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 760ed1429a..d065d62f53 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -116,10 +116,9 @@ import
org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
-import org.apache.dolphinscheduler.extract.master.IMasterLogService;
+import org.apache.dolphinscheduler.extract.common.ILogService;
import
org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import
org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
-import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -130,7 +129,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
@@ -516,15 +514,9 @@ public class ProcessServiceImpl implements ProcessService {
if (StringUtils.isEmpty(taskInstance.getHost()) ||
StringUtils.isEmpty(taskLogPath)) {
continue;
}
- if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
- IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(taskInstance.getHost(),
IMasterLogService.class);
- masterLogService.removeLogicTaskInstanceLog(taskLogPath);
- } else {
- IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory
- .getProxyClient(taskInstance.getHost(),
IWorkerLogService.class);
- iWorkerLogService.removeTaskInstanceLog(taskLogPath);
- }
+ ILogService iLogService =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
ILogService.class);
+ iLogService.removeTaskInstanceLog(taskLogPath);
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
index 405d79a0fb..24b5477cce 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
@@ -20,19 +20,19 @@ package org.apache.dolphinscheduler.server.worker.rpc;
import static
org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
-import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdRequest;
-import org.apache.dolphinscheduler.extract.worker.transportor.GetAppIdResponse;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest;
-import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse;
+import org.apache.dolphinscheduler.extract.common.ILogService;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
+import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
+import
org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import java.io.File;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@@ -41,11 +41,11 @@ import org.springframework.stereotype.Service;
@Slf4j
@Service
-public class WorkerLogServiceImpl implements IWorkerLogService {
+public class WorkerLogServiceImpl implements ILogService {
@Override
public TaskInstanceLogFileDownloadResponse
getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest
taskInstanceLogFileDownloadRequest) {
- byte[] bytes = org.apache.dolphinscheduler.common.utils.LogUtils
+ byte[] bytes = LogUtils
.getFileContentBytes(taskInstanceLogFileDownloadRequest.getTaskInstanceLogAbsolutePath());
// todo: if file not exists, return error result
return new TaskInstanceLogFileDownloadResponse(bytes);
@@ -53,12 +53,12 @@ public class WorkerLogServiceImpl implements
IWorkerLogService {
@Override
public TaskInstanceLogPageQueryResponse
pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest
taskInstanceLogPageQueryRequest) {
- List<String> lines =
org.apache.dolphinscheduler.common.utils.LogUtils.readPartFileContent(
+ List<String> lines = LogUtils.readPartFileContent(
taskInstanceLogPageQueryRequest.getTaskInstanceLogAbsolutePath(),
taskInstanceLogPageQueryRequest.getSkipLineNum(),
taskInstanceLogPageQueryRequest.getLimit());
- String logContent =
org.apache.dolphinscheduler.common.utils.LogUtils.rollViewLogLines(lines);
+ String logContent = LogUtils.rollViewLogLines(lines);
return new TaskInstanceLogPageQueryResponse(logContent);
}
@@ -68,20 +68,13 @@ public class WorkerLogServiceImpl implements
IWorkerLogService {
TaskExecutionContextCacheManager.getByTaskInstanceId(getAppIdRequest.getTaskInstanceId());
String appInfoPath = taskExecutionContext.getAppInfoPath();
String logPath = getAppIdRequest.getLogPath();
- List<String> appIds = LogUtils.getAppIds(logPath, appInfoPath,
+ List<String> appIds =
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils.getAppIds(logPath,
appInfoPath,
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
return new GetAppIdResponse(appIds);
}
@Override
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {
- File taskLogFile = new File(taskInstanceLogAbsolutePath);
- try {
- if (taskLogFile.exists()) {
- taskLogFile.delete();
- }
- } catch (Exception e) {
- log.error("Remove TaskInstanceLog error", e);
- }
+ FileUtils.deleteFile(taskInstanceLogAbsolutePath);
}
}