This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 817d94d1e [Bug] Fix the tail buffer memory leak (#1664)
817d94d1e is described below

commit 817d94d1e5d0ea5cbe18f0f531492abf168bece8
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Sep 23 00:26:18 2022 +0800

    [Bug] Fix the tail buffer memory leak (#1664)
---
 .../core/service/impl/ProjectServiceImpl.java      | 80 +++++++++++-----------
 1 file changed, 41 insertions(+), 39 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index faf19c098..e695c14cb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -53,6 +53,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -93,7 +94,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
 
     @Override
     public RestResponse create(Project project) {
-        LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper();
+        LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.eq(Project::getName, project.getName());
         long count = count(queryWrapper);
         RestResponse response = RestResponse.success();
@@ -149,7 +150,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
     public boolean delete(Long id) {
         Project project = getById(id);
         AssertUtils.state(project != null);
-        LambdaQueryWrapper<Application> queryWrapper = new 
LambdaQueryWrapper<Application>();
+        LambdaQueryWrapper<Application> queryWrapper = new 
LambdaQueryWrapper<>();
         queryWrapper.eq(Application::getProjectId, id);
         long count = applicationService.count(queryWrapper);
         if (count > 0) {
@@ -177,36 +178,35 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
         StringBuilder builder = new StringBuilder();
         tailBuffer.put(id, builder.append(project.getLog4BuildStart()));
         boolean cloneSuccess = cloneSourceCode(project, socketId);
-        if (cloneSuccess) {
-            executorService.execute(() -> {
-                boolean build = projectBuild(project, socketId);
-                if (build) {
-                    this.baseMapper.successBuild(project);
-                    // 发布到apps下
-                    try {
-                        this.deploy(project);
-                        // 更新application的发布状态.
-                        List<Application> applications = 
getApplications(project);
-                        // 更新部署状态
-                        FlinkTrackingTask.refreshTracking(() -> 
applications.forEach((app) -> {
-                            log.info("update deploy by project: {}, 
appName:{}", project.getName(), app.getJobName());
-                            app.setLaunch(LaunchState.NEED_LAUNCH.get());
-                            app.setBuild(true);
-                            this.applicationService.updateLaunch(app);
-                        }));
-                    } catch (Exception e) {
-                        this.baseMapper.failureBuild(project);
-                        log.error("deploy error, project name: {}, detail: 
{}", project.getName(), e.getMessage());
-                    }
-                } else {
-                    this.baseMapper.failureBuild(project);
-                    log.error("build error, project name: {} ", 
project.getName());
-                }
-            });
-        } else {
+        if (!cloneSuccess) {
             log.error("[StreamPark] clone or pull error.");
+            tailBuffer.remove(project.getId());
             this.baseMapper.failureBuild(project);
+            return;
         }
+        executorService.execute(() -> {
+            boolean build = projectBuild(project, socketId);
+            if (!build) {
+                this.baseMapper.failureBuild(project);
+                log.error("build error, project name: {} ", project.getName());
+                return;
+            }
+            this.baseMapper.successBuild(project);
+            try {
+                this.deploy(project);
+                List<Application> applications = getApplications(project);
+                // Update the deploy state
+                FlinkTrackingTask.refreshTracking(() -> 
applications.forEach((app) -> {
+                    log.info("update deploy by project: {}, appName:{}", 
project.getName(), app.getJobName());
+                    app.setLaunch(LaunchState.NEED_LAUNCH.get());
+                    app.setBuild(true);
+                    this.applicationService.updateLaunch(app);
+                }));
+            } catch (Exception e) {
+                this.baseMapper.failureBuild(project);
+                log.error("deploy error, project name: {}, detail: {}", 
project.getName(), e.getMessage());
+            }
+        });
     }
 
     private void deploy(Project project) throws Exception {
@@ -458,17 +458,19 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
 
     private boolean projectBuild(Project project, String socketId) {
         StringBuilder builder = tailBuffer.get(project.getId());
-        int code = CommandUtils.execute(project.getMavenWorkHome(), 
Arrays.asList(project.getMavenArgs()), (line) -> {
-            builder.append(line).append("\n");
-            if (tailOutMap.containsKey(project.getId())) {
-                if (tailBeginning.containsKey(project.getId())) {
-                    tailBeginning.remove(project.getId());
-                    Arrays.stream(builder.toString().split("\n"))
-                        .forEach(out -> 
WebSocketEndpoint.writeMessage(socketId, out));
+        int code = CommandUtils.execute(project.getMavenWorkHome(),
+            Collections.singletonList(project.getMavenArgs()),
+            (line) -> {
+                builder.append(line).append("\n");
+                if (tailOutMap.containsKey(project.getId())) {
+                    if (tailBeginning.containsKey(project.getId())) {
+                        tailBeginning.remove(project.getId());
+                        Arrays.stream(builder.toString().split("\n"))
+                            .forEach(out -> 
WebSocketEndpoint.writeMessage(socketId, out));
+                    }
+                    WebSocketEndpoint.writeMessage(socketId, line);
                 }
-                WebSocketEndpoint.writeMessage(socketId, line);
-            }
-        });
+            });
         closeBuildLog(project.getId());
         log.info(builder.toString());
         tailBuffer.remove(project.getId());

Reply via email to