This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 817d94d1e [Bug] Fix the tail buffer memory leak (#1664)
817d94d1e is described below
commit 817d94d1e5d0ea5cbe18f0f531492abf168bece8
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Sep 23 00:26:18 2022 +0800
[Bug] Fix the tail buffer memory leak (#1664)
---
.../core/service/impl/ProjectServiceImpl.java | 80 +++++++++++-----------
1 file changed, 41 insertions(+), 39 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index faf19c098..e695c14cb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -53,6 +53,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -93,7 +94,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
@Override
public RestResponse create(Project project) {
- LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper();
+ LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(Project::getName, project.getName());
long count = count(queryWrapper);
RestResponse response = RestResponse.success();
@@ -149,7 +150,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
public boolean delete(Long id) {
Project project = getById(id);
AssertUtils.state(project != null);
- LambdaQueryWrapper<Application> queryWrapper = new
LambdaQueryWrapper<Application>();
+ LambdaQueryWrapper<Application> queryWrapper = new
LambdaQueryWrapper<>();
queryWrapper.eq(Application::getProjectId, id);
long count = applicationService.count(queryWrapper);
if (count > 0) {
@@ -177,36 +178,35 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
StringBuilder builder = new StringBuilder();
tailBuffer.put(id, builder.append(project.getLog4BuildStart()));
boolean cloneSuccess = cloneSourceCode(project, socketId);
- if (cloneSuccess) {
- executorService.execute(() -> {
- boolean build = projectBuild(project, socketId);
- if (build) {
- this.baseMapper.successBuild(project);
- // 发布到apps下
- try {
- this.deploy(project);
- // 更新application的发布状态.
- List<Application> applications =
getApplications(project);
- // 更新部署状态
- FlinkTrackingTask.refreshTracking(() ->
applications.forEach((app) -> {
- log.info("update deploy by project: {},
appName:{}", project.getName(), app.getJobName());
- app.setLaunch(LaunchState.NEED_LAUNCH.get());
- app.setBuild(true);
- this.applicationService.updateLaunch(app);
- }));
- } catch (Exception e) {
- this.baseMapper.failureBuild(project);
- log.error("deploy error, project name: {}, detail:
{}", project.getName(), e.getMessage());
- }
- } else {
- this.baseMapper.failureBuild(project);
- log.error("build error, project name: {} ",
project.getName());
- }
- });
- } else {
+ if (!cloneSuccess) {
log.error("[StreamPark] clone or pull error.");
+ tailBuffer.remove(project.getId());
this.baseMapper.failureBuild(project);
+ return;
}
+ executorService.execute(() -> {
+ boolean build = projectBuild(project, socketId);
+ if (!build) {
+ this.baseMapper.failureBuild(project);
+ log.error("build error, project name: {} ", project.getName());
+ return;
+ }
+ this.baseMapper.successBuild(project);
+ try {
+ this.deploy(project);
+ List<Application> applications = getApplications(project);
+ // Update the deploy state
+ FlinkTrackingTask.refreshTracking(() ->
applications.forEach((app) -> {
+ log.info("update deploy by project: {}, appName:{}",
project.getName(), app.getJobName());
+ app.setLaunch(LaunchState.NEED_LAUNCH.get());
+ app.setBuild(true);
+ this.applicationService.updateLaunch(app);
+ }));
+ } catch (Exception e) {
+ this.baseMapper.failureBuild(project);
+ log.error("deploy error, project name: {}, detail: {}",
project.getName(), e.getMessage());
+ }
+ });
}
private void deploy(Project project) throws Exception {
@@ -458,17 +458,19 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
private boolean projectBuild(Project project, String socketId) {
StringBuilder builder = tailBuffer.get(project.getId());
- int code = CommandUtils.execute(project.getMavenWorkHome(),
Arrays.asList(project.getMavenArgs()), (line) -> {
- builder.append(line).append("\n");
- if (tailOutMap.containsKey(project.getId())) {
- if (tailBeginning.containsKey(project.getId())) {
- tailBeginning.remove(project.getId());
- Arrays.stream(builder.toString().split("\n"))
- .forEach(out ->
WebSocketEndpoint.writeMessage(socketId, out));
+ int code = CommandUtils.execute(project.getMavenWorkHome(),
+ Collections.singletonList(project.getMavenArgs()),
+ (line) -> {
+ builder.append(line).append("\n");
+ if (tailOutMap.containsKey(project.getId())) {
+ if (tailBeginning.containsKey(project.getId())) {
+ tailBeginning.remove(project.getId());
+ Arrays.stream(builder.toString().split("\n"))
+ .forEach(out ->
WebSocketEndpoint.writeMessage(socketId, out));
+ }
+ WebSocketEndpoint.writeMessage(socketId, line);
}
- WebSocketEndpoint.writeMessage(socketId, line);
- }
- });
+ });
closeBuildLog(project.getId());
log.info(builder.toString());
tailBuffer.remove(project.getId());