This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
new 443a6d1 delete log files while deleting process instances #2463
(#2693)
443a6d1 is described below
commit 443a6d193553bfb15029b7db0851ecb9839238a2
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue May 12 16:54:29 2020 +0800
delete log files while deleting process instances #2463 (#2693)
* script variable has "processDefinitionId" is error #2664
* blank in eamil and left font align #2648
* delete log files while deleting process instances #2463
* delete log files while deleting process instances #2463
* delete log files while deleting process instances #2463
* delete log files while deleting process instances #2463
Co-authored-by: qiaozhanwei <[email protected]>
---
.../api/service/ProcessInstanceService.java | 5 +-
.../remote/command/CommandType.java | 2 +-
.../command/log/RemoveTaskLogRequestCommand.java | 63 ++++++++++++++++++++++
.../command/log/RemoveTaskLogResponseCommand.java | 63 ++++++++++++++++++++++
.../server/log/LoggerRequestProcessor.java | 35 +++++++++---
.../dolphinscheduler/server/log/LoggerServer.java | 1 +
.../service/log/LogClientService.java | 29 ++++++++++
.../service/process/ProcessService.java | 36 ++++++++++++-
8 files changed, 221 insertions(+), 13 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index a5a3413..f8ad4c6 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -476,8 +476,6 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult;
}
ProcessInstance processInstance =
processService.findProcessInstanceDetailById(processInstanceId);
- List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(processInstanceId);
-
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceId);
return result;
@@ -485,8 +483,11 @@ public class ProcessInstanceService extends BaseDAGService
{
+ processService.removeTaskLogFile(processInstanceId);
// delete database cascade
int delete =
processService.deleteWorkProcessInstanceById(processInstanceId);
+
+
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index c8d5659..d1ffc65 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log respons
e
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
new file mode 100644
index 0000000..4cf6626
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * remove task log request command
+ */
+public class RemoveTaskLogRequestCommand implements Serializable {
+
+ /**
+ * log path
+ */
+ private String path;
+
+ public RemoveTaskLogRequestCommand() {
+ }
+
+ public RemoveTaskLogRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command(){
+ Command command = new Command();
+ command.setType(CommandType.REMOVE_TAK_LOG_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
new file mode 100644
index 0000000..a72f84a
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * remove task log request command
+ */
+public class RemoveTaskLogResponseCommand implements Serializable {
+
+ /**
+ * log path
+ */
+ private Boolean status;
+
+ public RemoveTaskLogResponseCommand() {
+ }
+
+ public RemoveTaskLogResponseCommand(Boolean status) {
+ this.status = status;
+ }
+
+ public Boolean getStatus() {
+ return status;
+ }
+
+ public void setStatus(Boolean status) {
+ this.status = status;
+ }
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index 44ec68f..a73e473 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -60,14 +60,14 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
*/
final CommandType commandType = command.getType();
switch (commandType){
- case GET_LOG_BYTES_REQUEST:
- GetLogBytesRequestCommand getLogRequest =
FastJsonSerializer.deserialize(
- command.getBody(),
GetLogBytesRequestCommand.class);
- byte[] bytes =
getFileContentBytes(getLogRequest.getPath());
- GetLogBytesResponseCommand getLogResponse = new
GetLogBytesResponseCommand(bytes);
-
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
- break;
- case VIEW_WHOLE_LOG_REQUEST:
+ case GET_LOG_BYTES_REQUEST:
+ GetLogBytesRequestCommand getLogRequest =
FastJsonSerializer.deserialize(
+ command.getBody(), GetLogBytesRequestCommand.class);
+ byte[] bytes = getFileContentBytes(getLogRequest.getPath());
+ GetLogBytesResponseCommand getLogResponse = new
GetLogBytesResponseCommand(bytes);
+
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
+ break;
+ case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest =
FastJsonSerializer.deserialize(
command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath());
@@ -86,6 +86,25 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
RollViewLogResponseCommand rollViewLogRequestResponse = new
RollViewLogResponseCommand(builder.toString());
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
break;
+ case REMOVE_TAK_LOG_REQUEST:
+ RemoveTaskLogRequestCommand removeTaskLogRequest =
FastJsonSerializer.deserialize(
+ command.getBody(), RemoveTaskLogRequestCommand.class);
+
+ String taskLogPath = removeTaskLogRequest.getPath();
+
+ File taskLogFile = new File(taskLogPath);
+ Boolean status = true;
+ try {
+ if (taskLogFile.exists()){
+ taskLogFile.delete();
+ }
+ }catch (Exception e){
+ status = false;
+ }
+
+ RemoveTaskLogResponseCommand removeTaskLogResponse = new
RemoveTaskLogResponseCommand(status);
+
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
+ break;
default:
throw new IllegalArgumentException("unknown commandType");
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
index 3520fb0..f1999e6 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
@@ -55,6 +55,7 @@ public class LoggerServer {
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,
requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST,
requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST,
requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST,
requestProcessor, requestProcessor.getExecutor());
}
/**
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index 8e63c89..4567d80 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -144,4 +144,33 @@ public class LogClientService {
}
return result;
}
+
+
+ /**
+ * remove task log
+ * @param host host
+ * @param port port
+ * @param path path
+ * @return remove task status
+ */
+ public Boolean removeTaskLog(String host, int port, String path) {
+ logger.info("log path {}", path);
+ RemoveTaskLogRequestCommand request = new
RemoveTaskLogRequestCommand(path);
+ Boolean result = false;
+ final Host address = new Host(host, port);
+ try {
+ Command command = request.convert2Command();
+ Command response = this.client.sendSync(address, command,
LOG_REQUEST_TIMEOUT);
+ if(response != null){
+ RemoveTaskLogResponseCommand taskLogResponse =
FastJsonSerializer.deserialize(
+ response.getBody(),
RemoveTaskLogResponseCommand.class);
+ return taskLogResponse.getStatus();
+ }
+ } catch (Exception e) {
+ logger.error("remove task log error", e);
+ } finally {
+ this.client.closeChannel(address);
+ }
+ return result;
+ }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index d9faf8c..f0ae76e 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -29,6 +29,8 @@ import
org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
@@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
+import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
@@ -238,7 +241,7 @@ public class ProcessService {
* @param defineId
* @return
*/
- public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
+ public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
ProcessDefinition processDefinition =
processDefineMapper.selectById(defineId);
if (processDefinition == null) {
logger.info("process define not exists");
@@ -293,9 +296,10 @@ public class ProcessService {
List<Integer> subProcessIdList =
processInstanceMapMapper.querySubIdListByParentId(processInstanceId);
- for(Integer subId : subProcessIdList ){
+ for(Integer subId : subProcessIdList){
deleteAllSubWorkProcessByParentId(subId);
deleteWorkProcessMapByParentId(subId);
+ removeTaskLogFile(subId);
deleteWorkProcessInstanceById(subId);
}
return 1;
@@ -303,6 +307,34 @@ public class ProcessService {
/**
+ * remove task log file
+ * @param processInstanceId processInstanceId
+ */
+ public void removeTaskLogFile(Integer processInstanceId){
+
+ LogClientService logClient = new LogClientService();
+
+ List<TaskInstance> taskInstanceList =
findValidTaskListByProcessId(processInstanceId);
+
+ if (CollectionUtils.isEmpty(taskInstanceList)){
+ return;
+ }
+
+ for (TaskInstance taskInstance : taskInstanceList){
+ String taskLogPath = taskInstance.getLogPath();
+ if (StringUtils.isEmpty(taskInstance.getHost())){
+ continue;
+ }
+ String ip = Host.of(taskInstance.getHost()).getIp();
+ int port = Constants.RPC_PORT;
+
+ // remove task log from loggerserver
+ logClient.removeTaskLog(ip,port,taskLogPath);
+ }
+ }
+
+
+ /**
* calculate sub process number in the process define.
* @param processDefinitionId processDefinitionId
* @return process thread num count