This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
     new 443a6d1  delete log files while deleting process instances #2463 
(#2693)
443a6d1 is described below

commit 443a6d193553bfb15029b7db0851ecb9839238a2
Author: qiaozhanwei <[email protected]>
AuthorDate: Tue May 12 16:54:29 2020 +0800

    delete log files while deleting process instances #2463 (#2693)
    
    * script variable has "processDefinitionId" is error #2664
    
    * blank in eamil and left font align  #2648
    
    * delete log files while deleting process instances #2463
    
    * delete log files while deleting process instances #2463
    
    * delete log files while deleting process instances #2463
    
    * delete log files while deleting process instances #2463
    
    Co-authored-by: qiaozhanwei <[email protected]>
---
 .../api/service/ProcessInstanceService.java        |  5 +-
 .../remote/command/CommandType.java                |  2 +-
 .../command/log/RemoveTaskLogRequestCommand.java   | 63 ++++++++++++++++++++++
 .../command/log/RemoveTaskLogResponseCommand.java  | 63 ++++++++++++++++++++++
 .../server/log/LoggerRequestProcessor.java         | 35 +++++++++---
 .../dolphinscheduler/server/log/LoggerServer.java  |  1 +
 .../service/log/LogClientService.java              | 29 ++++++++++
 .../service/process/ProcessService.java            | 36 ++++++++++++-
 8 files changed, 221 insertions(+), 13 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index a5a3413..f8ad4c6 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -476,8 +476,6 @@ public class ProcessInstanceService extends BaseDAGService {
             return checkResult;
         }
         ProcessInstance processInstance = 
processService.findProcessInstanceDetailById(processInstanceId);
-        List<TaskInstance> taskInstanceList = 
processService.findValidTaskListByProcessId(processInstanceId);
-
         if (null == processInstance) {
             putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, 
processInstanceId);
             return result;
@@ -485,8 +483,11 @@ public class ProcessInstanceService extends BaseDAGService 
{
 
 
 
+        processService.removeTaskLogFile(processInstanceId);
         // delete database cascade
         int delete = 
processService.deleteWorkProcessInstanceById(processInstanceId);
+
+
         processService.deleteAllSubWorkProcessByParentId(processInstanceId);
         processService.deleteWorkProcessMapByParentId(processInstanceId);
 
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index c8d5659..d1ffc65 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
  
    */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    TASK_EXECUTE_REQUEST,

    /**
     * execute task ack
     */
    TASK_EXECUTE_ACK,

    /**
     * execute task response
     */
    TASK_EXECUTE_RESPONSE,

    /**
     * kill task
     */
    TASK_KILL_REQUEST,

    /**
     * kill task response
     */
    TASK_KILL_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     * remove task log request,
     */
    REMOVE_TAK_LOG_REQUEST,

    /**
     * remove task log respons
 e
     */
    REMOVE_TAK_LOG_RESPONSE,

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
     */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    TASK_EXECUTE_REQUEST,

    /**
     * execute task ack
     */
    TASK_EXECUTE_ACK,

    /**
     * execute task response
     */
    TASK_EXECUTE_RESPONSE,

    /**
     * kill task
     */
    TASK_KILL_REQUEST,

    /**
     * kill task response
     */
    TASK_KILL_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
new file mode 100644
index 0000000..4cf6626
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ *  remove task log request command
+ */
+public class RemoveTaskLogRequestCommand implements Serializable {
+
+    /**
+     *  log path
+     */
+    private String path;
+
+    public RemoveTaskLogRequestCommand() {
+    }
+
+    public RemoveTaskLogRequestCommand(String path) {
+        this.path = path;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    /**
+     * package request command
+     *
+     * @return command
+     */
+    public Command convert2Command(){
+        Command command = new Command();
+        command.setType(CommandType.REMOVE_TAK_LOG_REQUEST);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
new file mode 100644
index 0000000..a72f84a
--- /dev/null
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ *  remove task log request command
+ */
+public class RemoveTaskLogResponseCommand implements Serializable {
+
+    /**
+     *  log path
+     */
+    private Boolean status;
+
+    public RemoveTaskLogResponseCommand() {
+    }
+
+    public RemoveTaskLogResponseCommand(Boolean status) {
+        this.status = status;
+    }
+
+    public Boolean getStatus() {
+        return status;
+    }
+
+    public void setStatus(Boolean status) {
+        this.status = status;
+    }
+
+    /**
+     * package request command
+     *
+     * @return command
+     */
+    public Command convert2Command(long opaque){
+        Command command = new Command(opaque);
+        command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index 44ec68f..a73e473 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -60,14 +60,14 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
          */
         final CommandType commandType = command.getType();
         switch (commandType){
-                case GET_LOG_BYTES_REQUEST:
-                    GetLogBytesRequestCommand getLogRequest = 
FastJsonSerializer.deserialize(
-                            command.getBody(), 
GetLogBytesRequestCommand.class);
-                    byte[] bytes = 
getFileContentBytes(getLogRequest.getPath());
-                    GetLogBytesResponseCommand getLogResponse = new 
GetLogBytesResponseCommand(bytes);
-                    
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
-                    break;
-                case VIEW_WHOLE_LOG_REQUEST:
+            case GET_LOG_BYTES_REQUEST:
+                GetLogBytesRequestCommand getLogRequest = 
FastJsonSerializer.deserialize(
+                        command.getBody(), GetLogBytesRequestCommand.class);
+                byte[] bytes = getFileContentBytes(getLogRequest.getPath());
+                GetLogBytesResponseCommand getLogResponse = new 
GetLogBytesResponseCommand(bytes);
+                
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
+                break;
+            case VIEW_WHOLE_LOG_REQUEST:
                 ViewLogRequestCommand viewLogRequest = 
FastJsonSerializer.deserialize(
                         command.getBody(), ViewLogRequestCommand.class);
                 String msg = readWholeFileContent(viewLogRequest.getPath());
@@ -86,6 +86,25 @@ public class LoggerRequestProcessor implements 
NettyRequestProcessor {
                 RollViewLogResponseCommand rollViewLogRequestResponse = new 
RollViewLogResponseCommand(builder.toString());
                 
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
                 break;
+            case REMOVE_TAK_LOG_REQUEST:
+                RemoveTaskLogRequestCommand removeTaskLogRequest = 
FastJsonSerializer.deserialize(
+                        command.getBody(), RemoveTaskLogRequestCommand.class);
+
+                String taskLogPath = removeTaskLogRequest.getPath();
+
+                File taskLogFile = new File(taskLogPath);
+                Boolean status = true;
+                try {
+                    if (taskLogFile.exists()){
+                        taskLogFile.delete();
+                    }
+                }catch (Exception e){
+                    status = false;
+                }
+
+                RemoveTaskLogResponseCommand removeTaskLogResponse = new 
RemoveTaskLogResponseCommand(status);
+                
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
+                break;
             default:
                 throw new IllegalArgumentException("unknown commandType");
         }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
index 3520fb0..f1999e6 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
@@ -55,6 +55,7 @@ public class LoggerServer {
         this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, 
requestProcessor, requestProcessor.getExecutor());
         this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, 
requestProcessor, requestProcessor.getExecutor());
         this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, 
requestProcessor, requestProcessor.getExecutor());
+        this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, 
requestProcessor, requestProcessor.getExecutor());
     }
 
     /**
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index 8e63c89..4567d80 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -144,4 +144,33 @@ public class LogClientService {
         }
         return result;
     }
+
+
+    /**
+     * remove task log
+     * @param host host
+     * @param port port
+     * @param path path
+     * @return remove task status
+     */
+    public Boolean removeTaskLog(String host, int port, String path) {
+        logger.info("log path {}", path);
+        RemoveTaskLogRequestCommand request = new 
RemoveTaskLogRequestCommand(path);
+        Boolean result = false;
+        final Host address = new Host(host, port);
+        try {
+            Command command = request.convert2Command();
+            Command response = this.client.sendSync(address, command, 
LOG_REQUEST_TIMEOUT);
+            if(response != null){
+                RemoveTaskLogResponseCommand taskLogResponse = 
FastJsonSerializer.deserialize(
+                        response.getBody(), 
RemoveTaskLogResponseCommand.class);
+                return taskLogResponse.getStatus();
+            }
+        } catch (Exception e) {
+            logger.error("remove task log error", e);
+        } finally {
+            this.client.closeChannel(address);
+        }
+        return result;
+    }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index d9faf8c..f0ae76e 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -29,6 +29,8 @@ import 
org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
 import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
@@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.io.File;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -238,7 +241,7 @@ public class ProcessService {
      * @param defineId
      * @return
      */
-    public  List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
+    public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
         ProcessDefinition processDefinition = 
processDefineMapper.selectById(defineId);
         if (processDefinition == null) {
             logger.info("process define not exists");
@@ -293,9 +296,10 @@ public class ProcessService {
 
         List<Integer> subProcessIdList = 
processInstanceMapMapper.querySubIdListByParentId(processInstanceId);
 
-        for(Integer subId : subProcessIdList ){
+        for(Integer subId : subProcessIdList){
             deleteAllSubWorkProcessByParentId(subId);
             deleteWorkProcessMapByParentId(subId);
+            removeTaskLogFile(subId);
             deleteWorkProcessInstanceById(subId);
         }
         return 1;
@@ -303,6 +307,34 @@ public class ProcessService {
 
 
     /**
+     * remove task log file
+     * @param processInstanceId processInstanceId
+     */
+    public void removeTaskLogFile(Integer processInstanceId){
+
+        LogClientService logClient = new LogClientService();
+
+        List<TaskInstance> taskInstanceList = 
findValidTaskListByProcessId(processInstanceId);
+
+        if (CollectionUtils.isEmpty(taskInstanceList)){
+            return;
+        }
+
+        for (TaskInstance taskInstance : taskInstanceList){
+            String taskLogPath = taskInstance.getLogPath();
+            if (StringUtils.isEmpty(taskInstance.getHost())){
+                continue;
+            }
+            String ip = Host.of(taskInstance.getHost()).getIp();
+            int port = Constants.RPC_PORT;
+
+            // remove task log from loggerserver
+            logClient.removeTaskLog(ip,port,taskLogPath);
+        }
+    }
+
+
+    /**
      * calculate sub process number in the process define.
      * @param processDefinitionId processDefinitionId
      * @return process thread num count

Reply via email to