This is an automated email from the ASF dual-hosted git repository.
arshad pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 20ac01b9 [Improve]Optimize the way of obtaining user information.
(#267)
20ac01b9 is described below
commit 20ac01b9ef98ae746bd368d0a1331c79e2b3fa15
Author: Jast <[email protected]>
AuthorDate: Wed Feb 5 13:58:42 2025 +0800
[Improve]Optimize the way of obtaining user information. (#267)
---
.../apache/seatunnel/app/config/AsyncConfig.java | 76 ++++++
.../seatunnel/app/config/ThreadPoolProperties.java | 54 ++++
.../apache/seatunnel/app/config/WebMvcConfig.java | 55 ++++
.../app/controller/JobConfigController.java | 4 +-
.../seatunnel/app/controller/JobController.java | 13 +-
.../app/controller/JobDefinitionController.java | 7 +-
.../app/controller/JobExecutorController.java | 22 +-
.../app/controller/JobMetricsController.java | 13 +-
.../controller/SeatunnelDatasourceController.java | 36 +--
.../app/controller/TaskInstanceController.java | 3 -
.../app/controller/VirtualTableController.java | 19 +-
.../app/interceptor/AuthenticationInterceptor.java | 6 +-
.../app/interceptor/UserContextInterceptor.java | 79 ++++++
.../UserContext.java} | 26 +-
.../seatunnel/app/service/IDatasourceService.java | 31 +--
.../seatunnel/app/service/IJobConfigService.java | 3 +-
.../app/service/IJobDefinitionService.java | 2 +-
.../seatunnel/app/service/IJobExecutorService.java | 6 +-
.../seatunnel/app/service/IJobInstanceService.java | 9 +-
.../seatunnel/app/service/IJobMetricsService.java | 15 +-
.../apache/seatunnel/app/service/IJobService.java | 7 +-
.../app/service/ITaskInstanceService.java | 7 +-
.../app/service/IVirtualTableService.java | 6 +-
.../app/service/impl/DatasourceServiceImpl.java | 42 ++--
.../app/service/impl/JobConfigServiceImpl.java | 4 +-
.../app/service/impl/JobDefinitionServiceImpl.java | 5 +-
.../app/service/impl/JobExecutorServiceImpl.java | 23 +-
.../app/service/impl/JobInstanceServiceImpl.java | 12 +-
.../app/service/impl/JobMetricsServiceImpl.java | 103 +++-----
.../seatunnel/app/service/impl/JobServiceImpl.java | 17 +-
.../app/service/impl/TaskInstanceServiceImpl.java | 21 +-
.../app/service/impl/VirtualTableServiceImpl.java | 42 ++--
.../ServletUtils.java} | 18 +-
.../src/main/resources/application.yml | 5 +
.../app/security/AsyncUserContextTest.java | 279 +++++++++++++++++++++
35 files changed, 757 insertions(+), 313 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/AsyncConfig.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/AsyncConfig.java
new file mode 100644
index 00000000..8a665210
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/AsyncConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.config;
+
+import org.apache.seatunnel.app.dal.entity.User;
+import org.apache.seatunnel.app.security.UserContext;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.TaskDecorator;
+import org.springframework.scheduling.annotation.AsyncConfigurer;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+
+@Configuration
+@EnableAsync
+public class AsyncConfig implements AsyncConfigurer {
+
+ private final ThreadPoolProperties threadPoolProperties;
+
+ public AsyncConfig(ThreadPoolProperties threadPoolProperties) {
+ this.threadPoolProperties = threadPoolProperties;
+ }
+
+ @Bean
+ public ThreadPoolTaskExecutor taskExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
+ executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
+ executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
+ executor.setThreadNamePrefix("AsyncThread-");
+ executor.setTaskDecorator(new ContextCopyingDecorator());
+ executor.initialize();
+ return executor;
+ }
+
+ @Override
+ public Executor getAsyncExecutor() {
+ return taskExecutor();
+ }
+
+ public static class ContextCopyingDecorator implements TaskDecorator {
+ @Override
+ public Runnable decorate(Runnable runnable) {
+ try {
+ User user = UserContext.getUser();
+ return () -> {
+ try {
+ UserContext.setUser(user);
+ runnable.run();
+ } finally {
+ UserContext.clear();
+ }
+ };
+ } catch (Exception e) {
+ return runnable;
+ }
+ }
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/ThreadPoolProperties.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/ThreadPoolProperties.java
new file mode 100644
index 00000000..d36fe309
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/ThreadPoolProperties.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Component
+@ConfigurationProperties(prefix = "spring.async-config")
+public class ThreadPoolProperties {
+ private int corePoolSize = 10;
+ private int maxPoolSize = 20;
+ private int queueCapacity = 100;
+
+ // getters and setters
+ public int getCorePoolSize() {
+ return corePoolSize;
+ }
+
+ public void setCorePoolSize(int corePoolSize) {
+ this.corePoolSize = corePoolSize;
+ }
+
+ public int getMaxPoolSize() {
+ return maxPoolSize;
+ }
+
+ public void setMaxPoolSize(int maxPoolSize) {
+ this.maxPoolSize = maxPoolSize;
+ }
+
+ public int getQueueCapacity() {
+ return queueCapacity;
+ }
+
+ public void setQueueCapacity(int queueCapacity) {
+ this.queueCapacity = queueCapacity;
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/WebMvcConfig.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/WebMvcConfig.java
new file mode 100644
index 00000000..a0a489b5
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/WebMvcConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.config;
+
+import org.apache.seatunnel.app.interceptor.UserContextInterceptor;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+/**
+ * Web MVC configuration for registering interceptors. This configuration
ensures proper handling of
+ * user context in web requests.
+ */
+@Configuration
+public class WebMvcConfig implements WebMvcConfigurer {
+
+ /**
+ * Creates and registers the UserContextInterceptor bean.
+ *
+ * @return A new instance of UserContextInterceptor
+ */
+ @Bean
+ public UserContextInterceptor userContextInterceptor() {
+ return new UserContextInterceptor();
+ }
+
+ /**
+ * Configures the interceptors for the application. UserContextInterceptor
is registered with a
+ * high order value to ensure it executes after the authentication
interceptor.
+ *
+ * @param registry The interceptor registry
+ */
+ @Override
+ public void addInterceptors(InterceptorRegistry registry) {
+ registry.addInterceptor(userContextInterceptor())
+ .addPathPatterns("/**")
+ .order(100); // High order value ensures execution after
authentication
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobConfigController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobConfigController.java
index 054f49ce..2ec31541 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobConfigController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobConfigController.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.app.service.IJobConfigService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
-import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -44,11 +43,10 @@ public class JobConfigController {
@PutMapping("/{jobVersionId}")
@ApiOperation(value = "update job config", httpMethod = "PUT")
Result<Void> updateJobConfig(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobVersionId", required = true) @PathVariable
long jobVersionId,
@ApiParam(value = "jobConfig", required = true) @RequestBody
JobConfig jobConfig)
throws JsonProcessingException {
- jobConfigService.updateJobConfig(userId, jobVersionId, jobConfig);
+ jobConfigService.updateJobConfig(jobVersionId, jobConfig);
return Result.success();
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
index b7b70ba7..1ef7e989 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java
@@ -25,7 +25,6 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
-import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -47,11 +46,9 @@ public class JobController {
value =
"Create a job, In jobDAG for inputPluginId and
targetPluginId use the plugin names instead of ids.",
httpMethod = "POST")
- public Result<Long> createJob(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
- @RequestBody JobCreateReq jobCreateRequest)
+ public Result<Long> createJob(@RequestBody JobCreateReq jobCreateRequest)
throws JsonProcessingException {
- return Result.success(jobCRUDService.createJob(userId,
jobCreateRequest));
+ return Result.success(jobCRUDService.createJob(jobCreateRequest));
}
@PutMapping("/update/{jobVersionId}")
@@ -59,21 +56,19 @@ public class JobController {
value = "Update a job, all the existing ids should be passed in
the request.",
httpMethod = "PUT")
public Result<Void> updateJob(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobVersionId", required = true) @PathVariable
long jobVersionId,
@RequestBody JobCreateReq jobCreateReq)
throws JsonProcessingException {
- jobCRUDService.updateJob(userId, jobVersionId, jobCreateReq);
+ jobCRUDService.updateJob(jobVersionId, jobCreateReq);
return Result.success();
}
@GetMapping("/get/{jobVersionId}")
@ApiOperation(value = "Get a job detail.", httpMethod = "GET")
public Result<JobRes> getJob(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobVersionId", required = true) @PathVariable
long jobVersionId)
throws JsonProcessingException {
- JobRes jobRes = jobCRUDService.getJob(userId, jobVersionId);
+ JobRes jobRes = jobCRUDService.getJob(jobVersionId);
return Result.success(jobRes);
}
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobDefinitionController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobDefinitionController.java
index d5253f63..3da3bf7c 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobDefinitionController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobDefinitionController.java
@@ -31,7 +31,6 @@ import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -57,12 +56,10 @@ public class JobDefinitionController {
*/
@PostMapping
@ApiOperation(value = "create job definition", httpMethod = "POST")
- Result<Long> createJobDefinition(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
- @RequestBody JobReq jobReq)
+ Result<Long> createJobDefinition(@RequestBody JobReq jobReq)
throws CodeGenerateUtils.CodeGenerateException {
if (jobService.getJob(jobReq.getName()).isEmpty()) {
- return Result.success(jobService.createJob(userId, jobReq));
+ return Result.success(jobService.createJob(jobReq));
} else {
return Result.failure(SeatunnelErrorEnum.TASK_NAME_ALREADY_EXISTS);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
index f7c8cea9..b8822428 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
@@ -31,7 +31,6 @@ import org.apache.seatunnel.server.common.SeatunnelException;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -57,22 +56,20 @@ public class JobExecutorController {
@PostMapping("/execute")
@ApiOperation(value = "Execute synchronization tasks", httpMethod = "POST")
public Result<Long> jobExecutor(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobDefineId", required = true)
@RequestParam("jobDefineId")
Long jobDefineId,
@RequestBody(required = false) JobExecParam executeParam) {
- return jobExecutorService.jobExecute(userId, jobDefineId,
executeParam);
+ return jobExecutorService.jobExecute(jobDefineId, executeParam);
}
@GetMapping("/resource")
@ApiOperation(value = "get the resource for job executor", httpMethod =
"GET")
public Result<JobExecutorRes> resource(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "Job define id", required = true) @RequestParam
Long jobDefineId)
throws IOException {
try {
JobExecutorRes executeResource =
- jobInstanceService.createExecuteResource(userId,
jobDefineId, null);
+ jobInstanceService.createExecuteResource(jobDefineId,
null);
return Result.success(executeResource);
} catch (Exception e) {
log.error("Get the resource for job executor error", e);
@@ -82,39 +79,34 @@ public class JobExecutorController {
@GetMapping("/pause")
public Result<Void> jobPause(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
- return jobExecutorService.jobPause(userId, jobInstanceId);
+ return jobExecutorService.jobPause(jobInstanceId);
}
@GetMapping("/restore")
public Result<Void> jobRestore(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
- return jobExecutorService.jobStore(userId, jobInstanceId);
+ return jobExecutorService.jobStore(jobInstanceId);
}
@GetMapping("/status")
@ApiOperation(value = "get job execution status", httpMethod = "GET")
Result<JobExecutionStatus> getJobExecutionStatus(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
- return taskInstanceService.getJobExecutionStatus(userId,
jobInstanceId);
+ return taskInstanceService.getJobExecutionStatus(jobInstanceId);
}
@GetMapping("/detail")
@ApiOperation(value = "get job execution status and some more details",
httpMethod = "GET")
Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
- return taskInstanceService.getJobExecutionDetail(userId,
jobInstanceId);
+ return taskInstanceService.getJobExecutionDetail(jobInstanceId);
}
@DeleteMapping("/delete")
@ApiOperation(value = "Deletes given job instance id", httpMethod =
"DELETE")
Result<Void> deleteJobInstance(
- @ApiParam(value = "userId", required = true)
@RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId) {
- return taskInstanceService.deleteJobInstanceById(userId,
jobInstanceId);
+ return taskInstanceService.deleteJobInstanceById(jobInstanceId);
}
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
index 8d1e3b3a..673f93aa 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
@@ -24,7 +24,6 @@ import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetric
import org.apache.seatunnel.app.service.IJobMetricsService;
import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@@ -48,31 +47,25 @@ public class JobMetricsController {
@GetMapping("/detail")
@ApiOperation(value = "get the job pipeline detail metrics", httpMethod =
"GET")
public Result<List<JobPipelineDetailMetricsRes>> detail(
- @ApiParam(value = "userId", required = true) @RequestAttribute
Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId)
throws IOException {
- return Result.success(
- jobMetricsService.getJobPipelineDetailMetricsRes(userId,
jobInstanceId));
+ return
Result.success(jobMetricsService.getJobPipelineDetailMetricsRes(jobInstanceId));
}
@GetMapping("/dag")
@ApiOperation(value = "get the job pipeline dag", httpMethod = "GET")
public Result<JobDAG> getJobDAG(
- @ApiParam(value = "userId", required = true) @RequestAttribute
Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId)
throws JsonProcessingException {
-
- return Result.success(jobMetricsService.getJobDAG(userId,
jobInstanceId));
+ return Result.success(jobMetricsService.getJobDAG(jobInstanceId));
}
@GetMapping("/summary")
@ApiOperation(value = "get the job pipeline summary metrics", httpMethod =
"GET")
public Result<List<JobPipelineSummaryMetricsRes>> summary(
- @ApiParam(value = "userId", required = true) @RequestAttribute
Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam
Long jobInstanceId)
throws IOException {
- return Result.success(
- jobMetricsService.getJobPipelineSummaryMetrics(userId,
jobInstanceId));
+ return
Result.success(jobMetricsService.getJobPipelineSummaryMetrics(jobInstanceId));
}
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
index 37746b96..6dfe70ce 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
@@ -112,14 +112,11 @@ public class SeatunnelDatasourceController extends
BaseController {
paramType = "query")
})
@PostMapping("/create")
- Result<String> createDatasource(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
- @RequestBody DatasourceReq req) {
+ Result<String> createDatasource(@RequestBody DatasourceReq req) {
String datasourceConfig = req.getDatasourceConfig();
Map<String, String> stringStringMap =
JsonUtils.toMap(datasourceConfig);
return Result.success(
datasourceService.createDatasource(
- loginUser.getId(),
req.getDatasourceName(),
req.getPluginName(),
DEFAULT_PLUGIN_VERSION,
@@ -143,15 +140,10 @@ public class SeatunnelDatasourceController extends
BaseController {
paramType = "query")
})
@PostMapping("/check/connect")
- Result<Boolean> testConnect(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
- @RequestBody DatasourceCheckReq req) {
+ Result<Boolean> testConnect(@RequestBody DatasourceCheckReq req) {
return Result.success(
datasourceService.testDatasourceConnectionAble(
- loginUser.getId(),
- req.getPluginName(),
- DEFAULT_PLUGIN_VERSION,
- req.getDatasourceConfig()));
+ req.getPluginName(), DEFAULT_PLUGIN_VERSION,
req.getDatasourceConfig()));
}
@ApiOperation(value = "update datasource", notes = "update datasource")
@@ -177,14 +169,11 @@ public class SeatunnelDatasourceController extends
BaseController {
})
@PutMapping("/{id}")
Result<Boolean> updateDatasource(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
- @PathVariable("id") String id,
- @RequestBody DatasourceReq req) {
+ @PathVariable("id") String id, @RequestBody DatasourceReq req) {
Map<String, String> stringStringMap =
JsonUtils.toMap(req.getDatasourceConfig());
Long datasourceId = Long.parseLong(id);
return Result.success(
datasourceService.updateDatasource(
- loginUser.getId(),
datasourceId,
req.getDatasourceName(),
req.getDescription(),
@@ -193,11 +182,9 @@ public class SeatunnelDatasourceController extends
BaseController {
@ApiOperation(value = "delete datasource by id", notes = "delete
datasource by id")
@DeleteMapping("/{id}")
- Result<Boolean> deleteDatasource(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
- @PathVariable("id") String id) {
+ Result<Boolean> deleteDatasource(@PathVariable("id") String id) {
Long datasourceId = Long.parseLong(id);
- return
Result.success(datasourceService.deleteDatasource(loginUser.getId(),
datasourceId));
+ return
Result.success(datasourceService.deleteDatasource(datasourceId));
}
@ApiOperation(value = "get datasource detail", notes = "get datasource
detail")
@@ -210,10 +197,8 @@ public class SeatunnelDatasourceController extends
BaseController {
paramType = "query"),
})
@GetMapping("/{id}")
- Result<DatasourceDetailRes> getDatasource(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
- @PathVariable("id") String id) {
- return
Result.success(datasourceService.queryDatasourceDetailById(loginUser.getId(),
id));
+ Result<DatasourceDetailRes> getDatasource(@PathVariable("id") String id) {
+ return Result.success(datasourceService.queryDatasourceDetailById(id));
}
@ApiOperation(value = "get datasource list", notes = "get datasource list")
@@ -245,14 +230,12 @@ public class SeatunnelDatasourceController extends
BaseController {
})
@GetMapping("/list")
Result<PageInfo<DatasourceRes>> getDatasourceList(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@RequestParam("searchVal") String searchVal,
@RequestParam("pluginName") String pluginName,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
PageInfo<DatasourceRes> datasourceResPageInfo =
- datasourceService.queryDatasourceList(
- loginUser.getId(), searchVal, pluginName, pageNo,
pageSize);
+ datasourceService.queryDatasourceList(searchVal, pluginName,
pageNo, pageSize);
if (CollectionUtils.isNotEmpty(datasourceResPageInfo.getData())) {
Map<Integer, String> userIdNameMap = userIdNameMap();
datasourceResPageInfo
@@ -361,7 +344,6 @@ public class SeatunnelDatasourceController extends
BaseController {
@PostMapping("/schemas")
Result<List<DatabaseTableFields>> getMultiTableFields(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@RequestParam("datasourceId") String datasourceId,
@RequestBody List<DatabaseTables> tableNames) {
DatasourceDetailRes res =
datasourceService.queryDatasourceDetailById(datasourceId);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
index bac75873..128c2e61 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/TaskInstanceController.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.common.constants.JobMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@@ -41,7 +40,6 @@ public class TaskInstanceController {
@GetMapping("/jobMetrics")
@ApiOperation(value = "get the jobMetrics list ", httpMethod = "GET")
public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(
- @RequestAttribute(name = "userId") Integer userId,
@RequestParam(name = "taskName", required = false) String
jobDefineName,
@RequestParam(name = "executorName", required = false) String
executorName,
@RequestParam(name = "stateType", required = false) String
stateType,
@@ -51,7 +49,6 @@ public class TaskInstanceController {
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
return taskInstanceService.getSyncTaskInstancePaging(
- userId,
jobDefineName,
executorName,
stateType,
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/VirtualTableController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/VirtualTableController.java
index eeba37d9..2870e572 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/VirtualTableController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/VirtualTableController.java
@@ -71,10 +71,8 @@ public class VirtualTableController extends BaseController {
dataType = "VirtualTableReq")
})
@PostMapping("/create")
- Result<String> createVirtualTable(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
- @RequestBody VirtualTableReq tableReq) {
- return
Result.success(virtualTableService.createVirtualTable(loginUser.getId(),
tableReq));
+ Result<String> createVirtualTable(@RequestBody VirtualTableReq tableReq) {
+ return
Result.success(virtualTableService.createVirtualTable(tableReq));
}
@ApiOperation(value = "update virtual table", httpMethod = "PUT")
@@ -92,11 +90,8 @@ public class VirtualTableController extends BaseController {
})
@PutMapping("/{id}")
Result<Boolean> updateVirtualTable(
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
- @PathVariable("id") String id,
- @RequestBody VirtualTableReq tableReq) {
- return Result.success(
- virtualTableService.updateVirtualTable(loginUser.getId(), id,
tableReq));
+ @PathVariable("id") String id, @RequestBody VirtualTableReq
tableReq) {
+ return Result.success(virtualTableService.updateVirtualTable(id,
tableReq));
}
@ApiOperation(value = "check virtual table valid", httpMethod = "GET")
@@ -131,10 +126,8 @@ public class VirtualTableController extends BaseController
{
dataType = "String")
})
@DeleteMapping("/{id}")
- Result<Boolean> deleteVirtualTable(
- @PathVariable("id") String id,
- @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser)
{
- return
Result.success(virtualTableService.deleteVirtualTable(loginUser.getId(), id));
+ Result<Boolean> deleteVirtualTable(@PathVariable("id") String id) {
+ return Result.success(virtualTableService.deleteVirtualTable(id));
}
@ApiOperation(value = "query virtual table detail by id", httpMethod =
"GET")
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/interceptor/AuthenticationInterceptor.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/interceptor/AuthenticationInterceptor.java
index 43cb15cf..d542293f 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/interceptor/AuthenticationInterceptor.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/interceptor/AuthenticationInterceptor.java
@@ -96,8 +96,10 @@ public class AuthenticationInterceptor implements
HandlerInterceptor {
User user = new User();
user.setUsername((String) map.get("name"));
user.setId((Integer) map.get("id"));
- // user.setStatus((Byte) map.get("status"));
- // user.setType((Byte) map.get("type"));
+ log.debug(
+ "Setting user to request attributes: userId={}, username={}",
+ user.getId(),
+ user.getUsername());
request.setAttribute(Constants.SESSION_USER, user);
request.setAttribute("userId", userId);
return true;
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/interceptor/UserContextInterceptor.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/interceptor/UserContextInterceptor.java
new file mode 100644
index 00000000..e8477cc0
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/interceptor/UserContextInterceptor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.interceptor;
+
+import org.apache.seatunnel.app.dal.entity.User;
+import org.apache.seatunnel.app.security.UserContext;
+
+import org.springframework.web.servlet.HandlerInterceptor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import static org.apache.seatunnel.app.common.Constants.SESSION_USER;
+
+/**
+ * Interceptor for managing user context in web requests. This interceptor
sets up and cleans up the
+ * user context for each request, ensuring proper user information
availability throughout request
+ * processing.
+ */
+@Slf4j
+public class UserContextInterceptor implements HandlerInterceptor {
+
+ /**
+ * Sets up the user context before request processing. Retrieves user
information from request
+ * attributes and stores it in ThreadLocal.
+ *
+ * @param request Current HTTP request
+ * @param response Current HTTP response
+ * @param handler Chosen handler to execute
+ * @return true to continue processing, false to stop
+ */
+ @Override
+ public boolean preHandle(
+ HttpServletRequest request, HttpServletResponse response, Object
handler) {
+ User user = (User) request.getAttribute(SESSION_USER);
+ if (user != null) {
+ log.debug("Setting user context for user: {}", user.getUsername());
+ UserContext.setUser(user);
+ } else {
+ log.warn("No user found in request attributes");
+ }
+ return true;
+ }
+
+ /**
+ * Cleans up the user context after request completion. Ensures
ThreadLocal resources are
+ * properly released.
+ *
+ * @param request Current HTTP request
+ * @param response Current HTTP response
+ * @param handler Handler that was executed
+ * @param ex Exception that was thrown during handler execution
+ */
+ @Override
+ public void afterCompletion(
+ HttpServletRequest request,
+ HttpServletResponse response,
+ Object handler,
+ Exception ex) {
+ log.debug("Clearing user context");
+ UserContext.clear();
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobConfigService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/security/UserContext.java
similarity index 59%
copy from
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobConfigService.java
copy to
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/security/UserContext.java
index b10a448a..7e42fabd 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobConfigService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/security/UserContext.java
@@ -14,16 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.app.service;
+package org.apache.seatunnel.app.security;
-import org.apache.seatunnel.app.domain.request.job.JobConfig;
-import org.apache.seatunnel.app.domain.response.job.JobConfigRes;
+import org.apache.seatunnel.app.dal.entity.User;
-import com.fasterxml.jackson.core.JsonProcessingException;
+public class UserContext {
+ private static final ThreadLocal<User> userHolder = new ThreadLocal<>();
-public interface IJobConfigService {
- JobConfigRes getJobConfig(long jobVersionIdId) throws
JsonProcessingException;
+ public static void setUser(User user) {
+ userHolder.set(user);
+ }
- void updateJobConfig(int userId, long jobVersionId, JobConfig jobConfig)
- throws JsonProcessingException;
+ public static User getUser() {
+ User user = userHolder.get();
+ if (user == null) {
+ throw new RuntimeException("User context not found");
+ }
+ return user;
+ }
+
+ public static void clear() {
+ userHolder.remove();
+ }
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IDatasourceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IDatasourceService.java
index caf593d8..afac531e 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IDatasourceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IDatasourceService.java
@@ -33,7 +33,6 @@ public interface IDatasourceService {
/**
* create datasource
*
- * @param userId userid
* @param datasourceName is required //todo datasourceName global is
required
* @param pluginName is required
* @param pluginVersion is required
@@ -42,7 +41,6 @@ public interface IDatasourceService {
* @return datasourceId
*/
String createDatasource(
- Integer userId,
String datasourceName,
String pluginName,
String pluginVersion,
@@ -53,7 +51,6 @@ public interface IDatasourceService {
/**
* update datasource
*
- * @param userId userid
* @param datasourceId datasource id
* @param datasourceName datasourceName
* @param description description
@@ -61,7 +58,6 @@ public interface IDatasourceService {
* @return boolean
*/
boolean updateDatasource(
- Integer userId,
Long datasourceId,
String datasourceName,
String description,
@@ -70,66 +66,49 @@ public interface IDatasourceService {
/**
* delete datasource
*
- * @param userId userId
* @param datasourceId datasourceId
* @return boolean
*/
- boolean deleteDatasource(Integer userId, Long datasourceId);
+ boolean deleteDatasource(Long datasourceId);
/**
* test datasource is used
*
- * @param userId userId
* @param pluginName pluginName
* @param pluginVersion pluginVersion default is 1.0.0
* @param datasourceConfig datasourceConfig
* @return boolean
*/
boolean testDatasourceConnectionAble(
- Integer userId,
- String pluginName,
- String pluginVersion,
- Map<String, String> datasourceConfig);
+ String pluginName, String pluginVersion, Map<String, String>
datasourceConfig);
/**
* test datasource is used
*
- * @param userId userId
* @param datasourceId datasourceId
* @return boolean
*/
- boolean testDatasourceConnectionAble(Integer userId, Long datasourceId);
+ boolean testDatasourceConnectionAble(Long datasourceId);
/**
* checkDatasourceNameUnique
*
- * @param userId userId
* @param datasourceName datasourceName
* @param dataSourceId dataSourceId
* @return boolean
*/
- boolean checkDatasourceNameUnique(Integer userId, String datasourceName,
Long dataSourceId);
+ boolean checkDatasourceNameUnique(String datasourceName, Long
dataSourceId);
/**
* queryDatasourceList
*
- * @param userId userId
* @param pluginName pluginName
* @param pageNo pageNo
* @param pageSize pageSize
* @return PageInfo DatasourceRes
*/
PageInfo<DatasourceRes> queryDatasourceList(
- Integer userId, String searchVal, String pluginName, Integer
pageNo, Integer pageSize);
-
- /**
- * datasourceId query detail
- *
- * @param userId userId
- * @param datasourceId datasourceId
- * @return DatasourceDetailRes
- */
- DatasourceDetailRes queryDatasourceDetailById(Integer userId, String
datasourceId);
+ String searchVal, String pluginName, Integer pageNo, Integer
pageSize);
/**
* datasourceId query detail
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobConfigService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobConfigService.java
index b10a448a..23891174 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobConfigService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobConfigService.java
@@ -24,6 +24,5 @@ import com.fasterxml.jackson.core.JsonProcessingException;
public interface IJobConfigService {
JobConfigRes getJobConfig(long jobVersionIdId) throws
JsonProcessingException;
- void updateJobConfig(int userId, long jobVersionId, JobConfig jobConfig)
- throws JsonProcessingException;
+ void updateJobConfig(long jobVersionId, JobConfig jobConfig) throws
JsonProcessingException;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobDefinitionService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobDefinitionService.java
index 75e96f45..416b210b 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobDefinitionService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobDefinitionService.java
@@ -31,7 +31,7 @@ import java.util.Map;
public interface IJobDefinitionService {
- long createJob(int userId, JobReq jobReq) throws
CodeGenerateUtils.CodeGenerateException;
+ long createJob(JobReq jobReq) throws
CodeGenerateUtils.CodeGenerateException;
PageInfo<JobDefinitionRes> getJob(String name, Integer pageNo, Integer
pageSize);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
index ad227422..fd6ea2c6 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
@@ -22,9 +22,9 @@ import
org.apache.seatunnel.app.domain.request.job.JobExecParam;
public interface IJobExecutorService {
- Result<Long> jobExecute(Integer userId, Long jobDefineId, JobExecParam
executeParam);
+ Result<Long> jobExecute(Long jobDefineId, JobExecParam executeParam);
- Result<Void> jobPause(Integer userId, Long jobInstanceId);
+ Result<Void> jobPause(Long jobInstanceId);
- Result<Void> jobStore(Integer userId, Long jobInstanceId);
+ Result<Void> jobStore(Long jobInstanceId);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
index e971452e..de607adc 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java
@@ -28,8 +28,7 @@ import lombok.NonNull;
import java.util.List;
public interface IJobInstanceService {
- JobExecutorRes createExecuteResource(
- @NonNull Integer userId, @NonNull Long jobDefineId, JobExecParam
executeParam);
+ JobExecutorRes createExecuteResource(@NonNull Long jobDefineId,
JobExecParam executeParam);
String generateJobConfig(
Long jobId,
@@ -40,9 +39,5 @@ public interface IJobInstanceService {
JobExecutorRes getExecuteResource(@NonNull Long jobEngineId);
- void complete(
- @NonNull Integer userId,
- @NonNull Long jobInstanceId,
- @NonNull String jobEngineId,
- JobResult jobResult);
+ void complete(@NonNull Long jobInstanceId, @NonNull String jobEngineId,
JobResult jobResult);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
index 8a885d13..4fbaca40 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
@@ -34,25 +34,20 @@ import java.util.Map;
public interface IJobMetricsService {
- List<JobPipelineSummaryMetricsRes> getJobPipelineSummaryMetrics(
- @NonNull Integer userId, @NonNull Long jobInstanceId);
+ List<JobPipelineSummaryMetricsRes> getJobPipelineSummaryMetrics(@NonNull
Long jobInstanceId);
- List<JobPipelineDetailMetricsRes> getJobPipelineDetailMetricsRes(
- @NonNull Integer userId, @NonNull Long jobInstanceId);
+ List<JobPipelineDetailMetricsRes> getJobPipelineDetailMetricsRes(@NonNull
Long jobInstanceId);
- JobDAG getJobDAG(@NonNull Integer userId, @NonNull Long jobInstanceId)
- throws JsonProcessingException;
+ JobDAG getJobDAG(@NonNull Long jobInstanceId) throws
JsonProcessingException;
ImmutablePair<Long, String> getInstanceIdAndEngineId(@NonNull String key);
- void syncJobDataToDb(
- @NonNull JobInstance jobInstance, @NonNull Integer userId,
@NonNull String jobEngineId);
+ void syncJobDataToDb(@NonNull JobInstance jobInstance, @NonNull String
jobEngineId);
JobSummaryMetricsRes getJobSummaryMetrics(
- @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull
String jobEngineId);
+ @NonNull Long jobInstanceId, @NonNull String jobEngineId);
Map<Long, JobSummaryMetricsRes> getALLJobSummaryMetrics(
- @NonNull Integer userId,
@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
@NonNull List<Long> jobInstanceIdList,
@NonNull JobMode jobMode);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
index e8e5904b..477e4c5a 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java
@@ -23,10 +23,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
public interface IJobService {
- long createJob(int userId, JobCreateReq jobCreateRequest) throws
JsonProcessingException;
+ long createJob(JobCreateReq jobCreateRequest) throws
JsonProcessingException;
- void updateJob(Integer userId, long jobId, JobCreateReq jobCreateReq)
- throws JsonProcessingException;
+ void updateJob(long jobId, JobCreateReq jobCreateReq) throws
JsonProcessingException;
- JobRes getJob(Integer userId, long jobId) throws JsonProcessingException;
+ JobRes getJob(long jobId) throws JsonProcessingException;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
index 7ba561f7..f7dd3970 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.common.constants.JobMode;
public interface ITaskInstanceService<T> {
Result<PageInfo<T>> getSyncTaskInstancePaging(
- Integer userId,
String jobDefineName,
String executorName,
String stateType,
@@ -36,9 +35,9 @@ public interface ITaskInstanceService<T> {
Integer pageNo,
Integer pageSize);
- Result<JobExecutionStatus> getJobExecutionStatus(Integer userId, long
jobInstanceId);
+ Result<JobExecutionStatus> getJobExecutionStatus(long jobInstanceId);
- Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(Integer userId, long
jobInstanceId);
+ Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(long jobInstanceId);
- Result<Void> deleteJobInstanceById(Integer userId, long jobInstanceId);
+ Result<Void> deleteJobInstanceById(long jobInstanceId);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IVirtualTableService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IVirtualTableService.java
index 80e9b808..87c8b999 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IVirtualTableService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IVirtualTableService.java
@@ -27,12 +27,12 @@ import java.util.List;
public interface IVirtualTableService {
- String createVirtualTable(Integer userId, VirtualTableReq req)
+ String createVirtualTable(VirtualTableReq tableReq)
throws CodeGenerateUtils.CodeGenerateException;
- Boolean updateVirtualTable(Integer userId, String tableId, VirtualTableReq
req);
+ boolean updateVirtualTable(String tableId, VirtualTableReq tableReq);
- Boolean deleteVirtualTable(Integer userId, String tableId);
+ boolean deleteVirtualTable(String tableId);
Boolean checkVirtualTableValid(VirtualTableReq req);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
index 403e0420..12b1a4f1 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.app.service.IJobDefinitionService;
import org.apache.seatunnel.app.service.ITableSchemaService;
import org.apache.seatunnel.app.thirdparty.datasource.DataSourceClientFactory;
import
org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper;
+import org.apache.seatunnel.app.utils.ServletUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
@@ -95,13 +96,13 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public String createDatasource(
- Integer userId,
String datasourceName,
String pluginName,
String pluginVersion,
String description,
Map<String, String> datasourceConfig)
throws CodeGenerateUtils.CodeGenerateException {
+ Integer userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_CREATE,
userId);
long uuid = CodeGenerateUtils.getInstance().genCode();
boolean unique =
datasourceDao.checkDatasourceNameUnique(datasourceName, 0L);
@@ -140,7 +141,6 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public boolean updateDatasource(
- Integer userId,
Long datasourceId,
String datasourceName,
String description,
@@ -149,7 +149,7 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
SeatunnelFuncPermissionKeyConstant.DATASOURCE_UPDATE,
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
Collections.singletonList(datasourceId),
- userId);
+ ServletUtils.getCurrentUserId());
if (datasourceId == null) {
throw new SeatunnelException(
SeatunnelErrorEnum.DATASOURCE_PRAM_NOT_ALLOWED_NULL,
"datasourceId");
@@ -167,7 +167,7 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
SeatunnelErrorEnum.DATASOURCE_NAME_ALREADY_EXISTS,
datasourceName);
}
}
- datasource.setUpdateUserId(userId);
+ datasource.setUpdateUserId(ServletUtils.getCurrentUserId());
datasource.setUpdateTime(new Date());
datasource.setDescription(description);
if (MapUtils.isNotEmpty(datasourceConfig)) {
@@ -178,13 +178,13 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
}
@Override
- public boolean deleteDatasource(Integer userId, Long datasourceId) {
+ public boolean deleteDatasource(Long datasourceId) {
// check role permission
funcAndResourcePermissionCheck(
SeatunnelFuncPermissionKeyConstant.DATASOURCE_DELETE,
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
Collections.singletonList(datasourceId),
- userId);
+ ServletUtils.getCurrentUserId());
// check has job task has used this datasource
List<JobTask> jobTaskList =
jobTaskDao.getJobTaskByDataSourceId(datasourceId);
if (!CollectionUtils.isEmpty(jobTaskList)) {
@@ -203,22 +203,21 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public boolean testDatasourceConnectionAble(
- Integer userId,
- String pluginName,
- String pluginVersion,
- Map<String, String> datasourceConfig) {
-
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_TEST_CONNECT,
userId);
+ String pluginName, String pluginVersion, Map<String, String>
datasourceConfig) {
+ funcPermissionCheck(
+ SeatunnelFuncPermissionKeyConstant.DATASOURCE_TEST_CONNECT,
+ ServletUtils.getCurrentUserId());
return DataSourceClientFactory.getDataSourceClient()
.checkDataSourceConnectivity(pluginName, datasourceConfig);
}
@Override
- public boolean testDatasourceConnectionAble(Integer userId, Long
datasourceId) {
+ public boolean testDatasourceConnectionAble(Long datasourceId) {
funcAndResourcePermissionCheck(
SeatunnelFuncPermissionKeyConstant.DATASOURCE_TEST_CONNECT,
SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
Collections.singletonList(datasourceId),
- userId);
+ ServletUtils.getCurrentUserId());
Datasource datasource =
datasourceDao.selectDatasourceById(datasourceId);
if (datasource == null) {
throw new SeatunnelException(
@@ -255,8 +254,7 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
}
@Override
- public boolean checkDatasourceNameUnique(
- Integer userId, String datasourceName, Long dataSourceId) {
+ public boolean checkDatasourceNameUnique(String datasourceName, Long
dataSourceId) {
if (StringUtils.isNotBlank(datasourceName)) {
return datasourceDao.checkDatasourceNameUnique(datasourceName,
dataSourceId);
}
@@ -389,7 +387,8 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public PageInfo<DatasourceRes> queryDatasourceList(
- Integer userId, String searchVal, String pluginName, Integer
pageNo, Integer pageSize) {
+ String searchVal, String pluginName, Integer pageNo, Integer
pageSize) {
+ Integer userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.DATASOURCE_LIST, userId);
Page<Datasource> page = new Page<>(pageNo, pageSize);
PageInfo<DatasourceRes> pageInfo = new PageInfo<>();
@@ -620,17 +619,6 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
return getDatasourceDetailRes(datasource);
}
- @Override
- public DatasourceDetailRes queryDatasourceDetailById(Integer userId,
String datasourceId) {
- // check user
- funcAndResourcePermissionCheck(
- SeatunnelFuncPermissionKeyConstant.DATASOURCE_DETAIL,
- SeatunnelResourcePermissionModuleEnum.DATASOURCE.name(),
- Collections.singletonList(Long.parseLong(datasourceId)),
- userId);
- return this.queryDatasourceDetailById(datasourceId);
- }
-
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = applicationContext;
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java
index 4e7cde97..97bf1884 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.app.domain.request.job.JobConfig;
import org.apache.seatunnel.app.domain.response.job.JobConfigRes;
import
org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.service.IJobConfigService;
+import org.apache.seatunnel.app.utils.ServletUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
@@ -69,8 +70,9 @@ public class JobConfigServiceImpl extends
SeatunnelBaseServiceImpl implements IJ
@Override
@Transactional
- public void updateJobConfig(int userId, long jobVersionId, JobConfig
jobConfig)
+ public void updateJobConfig(long jobVersionId, JobConfig jobConfig)
throws JsonProcessingException {
+ Integer userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_CONFIG_UPDATE, 0);
JobVersion version = jobVersionDao.getVersionById(jobVersionId);
if (version == null) {
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java
index b97a55ff..07c07125 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobDefinitionServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.app.domain.response.PageInfo;
import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes;
import
org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.service.IJobDefinitionService;
+import org.apache.seatunnel.app.utils.ServletUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
@@ -71,8 +72,8 @@ public class JobDefinitionServiceImpl extends
SeatunnelBaseServiceImpl
@Override
@Transactional
- public long createJob(int userId, JobReq jobReq)
- throws CodeGenerateUtils.CodeGenerateException {
+ public long createJob(JobReq jobReq) throws
CodeGenerateUtils.CodeGenerateException {
+ Integer userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_DEFINITION_CREATE,
userId);
long uuid = CodeGenerateUtils.getInstance().genCode();
jobDefinitionDao.add(
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index 185222eb..0369931e 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -42,6 +42,8 @@ import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Service;
import com.hazelcast.client.config.ClientConfig;
@@ -65,18 +67,19 @@ import java.util.concurrent.Executors;
public class JobExecutorServiceImpl implements IJobExecutorService {
@Resource private IJobInstanceService jobInstanceService;
@Resource private IJobInstanceDao jobInstanceDao;
+ @Autowired private AsyncTaskExecutor taskExecutor;
@Override
- public Result<Long> jobExecute(Integer userId, Long jobDefineId,
JobExecParam executeParam) {
+ public Result<Long> jobExecute(Long jobDefineId, JobExecParam
executeParam) {
JobExecutorRes executeResource =
- jobInstanceService.createExecuteResource(userId, jobDefineId,
executeParam);
+ jobInstanceService.createExecuteResource(jobDefineId,
executeParam);
String jobConfig = executeResource.getJobConfig();
String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId);
try {
- executeJobBySeaTunnel(userId, configFile,
executeResource.getJobInstanceId());
+ executeJobBySeaTunnel(configFile,
executeResource.getJobInstanceId());
return Result.success(executeResource.getJobInstanceId());
} catch (RuntimeException e) {
Result<Long> failure =
@@ -112,7 +115,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
return filePath;
}
- private void executeJobBySeaTunnel(Integer userId, String filePath, Long
jobInstanceId) {
+ private void executeJobBySeaTunnel(String filePath, Long jobInstanceId) {
Common.setDeployMode(DeployMode.CLIENT);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
@@ -137,20 +140,20 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
jobInstanceDao.update(jobInstance);
+ // Use Spring thread pool to handle asynchronous user retrieval
CompletableFuture.runAsync(
() -> {
waitJobFinish(
clientJobProxy,
- userId,
jobInstanceId,
Long.toString(clientJobProxy.getJobId()),
seaTunnelClient);
- });
+ },
+ taskExecutor);
}
public void waitJobFinish(
ClientJobProxy clientJobProxy,
- Integer userId,
Long jobInstanceId,
String jobEngineId,
SeaTunnelClient seaTunnelClient) {
@@ -170,7 +173,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
} finally {
seaTunnelClient.close();
log.info("and jobInstanceService.complete begin");
- jobInstanceService.complete(userId, jobInstanceId, jobEngineId,
jobResult);
+ jobInstanceService.complete(jobInstanceId, jobEngineId, jobResult);
}
}
@@ -180,7 +183,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
}
@Override
- public Result<Void> jobPause(Integer userId, Long jobInstanceId) {
+ public Result<Void> jobPause(Long jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
if (getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId())
== JobStatus.RUNNING) {
@@ -204,7 +207,7 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
}
@Override
- public Result<Void> jobStore(Integer userId, Long jobInstanceId) {
+ public Result<Void> jobStore(Long jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
String projectRoot = System.getProperty("user.dir");
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index 6b1e97af..e7883395 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -62,6 +62,7 @@ import
org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcherUt
import
org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcherUtils;
import org.apache.seatunnel.app.utils.JobUtils;
import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
+import org.apache.seatunnel.app.utils.ServletUtils;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
@@ -125,7 +126,8 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public JobExecutorRes createExecuteResource(
- @NonNull Integer userId, @NonNull Long jobDefineId, JobExecParam
executeParam) {
+ @NonNull Long jobDefineId, JobExecParam executeParam) {
+ int userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_RESOURCE,
userId);
log.info(
"receive createExecuteResource request, userId:{},
jobDefineId:{}",
@@ -349,13 +351,11 @@ public class JobInstanceServiceImpl extends
SeatunnelBaseServiceImpl
@Override
public void complete(
- @NonNull Integer userId,
- @NonNull Long jobInstanceId,
- @NonNull String jobEngineId,
- JobResult jobResult) {
+ @NonNull Long jobInstanceId, @NonNull String jobEngineId,
JobResult jobResult) {
+ int userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE,
userId);
JobInstance jobInstance =
jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
- jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);
+ jobMetricsService.syncJobDataToDb(jobInstance, jobEngineId);
jobInstance.setJobStatus(jobResult.getStatus());
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
index feb2c225..ade203b3 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
import
org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.app.utils.ServletUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -73,17 +74,18 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
@Override
public List<JobPipelineSummaryMetricsRes> getJobPipelineSummaryMetrics(
- @NonNull Integer userId, @NonNull Long jobInstanceId) {
+ @NonNull Long jobInstanceId) {
+ int userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY,
userId);
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
- List<JobMetrics> jobPipelineDetailMetrics =
- getJobPipelineDetailMetrics(jobInstance, userId);
+ List<JobMetrics> jobPipelineDetailMetrics =
getJobPipelineDetailMetrics(jobInstance);
return summaryMetrics(jobPipelineDetailMetrics);
}
@Override
public JobSummaryMetricsRes getJobSummaryMetrics(
- @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull
String jobEngineId) {
+ @NonNull Long jobInstanceId, @NonNull String jobEngineId) {
+ int userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY,
userId);
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
Engine engine = new Engine(jobInstance.getEngineName(),
jobInstance.getEngineVersion());
@@ -91,8 +93,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
(new
EngineMetricsExtractorFactory(engine)).getEngineMetricsExtractor();
JobStatus jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId);
- List<JobMetrics> jobPipelineDetailMetrics =
- getJobPipelineDetailMetrics(jobInstance, userId);
+ List<JobMetrics> jobPipelineDetailMetrics =
getJobPipelineDetailMetrics(jobInstance);
long readCount =
jobPipelineDetailMetrics.stream().mapToLong(JobMetrics::getReadRowCount).sum();
long writeCount =
@@ -104,12 +105,11 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
@Override
public Map<Long, JobSummaryMetricsRes> getALLJobSummaryMetrics(
- @NonNull Integer userId,
@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
@NonNull List<Long> jobInstanceIdList,
@NonNull JobMode jobMode) {
log.info("jobInstanceIdAndJobEngineIdMap={}",
jobInstanceIdAndJobEngineIdMap);
-
+ int userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY,
userId);
List<JobInstance> allJobInstance =
jobInstanceDao.getAllJobInstance(jobInstanceIdList);
if (allJobInstance.isEmpty()) {
@@ -128,14 +128,12 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
result =
getMatricsListIfTaskTypeIsBatch(
allJobInstance,
- userId,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
} else if (JobMode.STREAMING == jobMode) {
result =
getMatricsListIfTaskTypeIsStreaming(
allJobInstance,
- userId,
allRunningJobMetricsFromEngine,
jobInstanceIdAndJobEngineIdMap);
}
@@ -146,7 +144,6 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsBatch(
List<JobInstance> allJobInstance,
- Integer userId,
Map<Long, HashMap<Integer, JobMetrics>>
allRunningJobMetricsFromEngine,
Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {
@@ -175,8 +172,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
- jobInstanceIdAndJobEngineIdMap,
- userId);
+ jobInstanceIdAndJobEngineIdMap);
} else {
log.info(
@@ -184,7 +180,6 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
JobSummaryMetricsRes jobMetricsFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
- userId,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
@@ -205,7 +200,6 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
JobSummaryMetricsRes jobMetricsFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
- userId,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())));
log.info("jobStatus=finish oe
canceled,JobSummaryMetricsRes={}", jobMetricsFromDb);
@@ -219,8 +213,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
private void modifyAndUpdateJobInstanceAndJobMetrics(
JobInstance jobInstance,
Map<Long, HashMap<Integer, JobMetrics>>
allRunningJobMetricsFromEngine,
- Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
- Integer userId) {
+ Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {
jobInstance.setJobStatus(JobStatus.RUNNING);
HashMap<Integer, JobMetrics> jobMetricsFromEngine =
allRunningJobMetricsFromEngine.get(
@@ -230,7 +223,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
if (jobMetricsFromDb.isEmpty()) {
log.info("002jobMetricsFromDb == null");
- syncMetricsToDbRunning(jobInstance, userId, jobMetricsFromEngine);
+ syncMetricsToDbRunning(jobInstance, jobMetricsFromEngine);
jobInstanceDao.update(jobInstance);
} else {
jobMetricsFromDb.forEach(
@@ -253,7 +246,6 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
private Map<Long, JobSummaryMetricsRes>
getMatricsListIfTaskTypeIsStreaming(
List<JobInstance> allJobInstance,
- Integer userId,
Map<Long, HashMap<Integer, JobMetrics>>
allRunningJobMetricsFromEngine,
Map<Long, Long> jobInstanceIdAndJobEngineIdMap) {
@@ -270,7 +262,6 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
JobSummaryMetricsRes jobMetricsFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
- userId,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
@@ -290,8 +281,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
- jobInstanceIdAndJobEngineIdMap,
- userId);
+ jobInstanceIdAndJobEngineIdMap);
// Return data from the front-end
JobSummaryMetricsRes jobMetricsFromEngineRes =
@@ -305,7 +295,6 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
JobSummaryMetricsRes jobMetricsFromDb =
getJobSummaryMetricsResByDb(
jobInstance,
- userId,
Long.toString(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
@@ -321,8 +310,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
modifyAndUpdateJobInstanceAndJobMetrics(
jobInstance,
allRunningJobMetricsFromEngine,
- jobInstanceIdAndJobEngineIdMap,
- userId);
+ jobInstanceIdAndJobEngineIdMap);
// Return data from the front-end
JobSummaryMetricsRes jobMetricsFromEngineRes =
getRunningJobMetricsFromEngine(
@@ -350,7 +338,6 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
JobSummaryMetricsRes jobSummaryMetricsResByDb =
getJobSummaryMetricsResByDb(
jobInstance,
- userId,
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
@@ -359,7 +346,6 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
List<JobMetrics> jobMetricsFromDb =
getJobMetricsFromDb(
jobInstance,
- userId,
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
@@ -406,8 +392,8 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
}
private JobSummaryMetricsRes getJobSummaryMetricsResByDb(
- JobInstance jobInstance, Integer userId, String jobEngineId) {
- List<JobMetrics> jobMetricsFromDb = getJobMetricsFromDb(jobInstance,
userId, jobEngineId);
+ JobInstance jobInstance, String jobEngineId) {
+ List<JobMetrics> jobMetricsFromDb = getJobMetricsFromDb(jobInstance,
jobEngineId);
if (!jobMetricsFromDb.isEmpty()) {
long readCount =
jobMetricsFromDb.stream().mapToLong(JobMetrics::getReadRowCount).sum();
long writeCount =
@@ -458,16 +444,15 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
return
engineMetricsExtractor.getMetricsByJobEngineIdRTMap(jobEngineId);
}
- private List<JobMetrics> getJobPipelineDetailMetrics(
- @NonNull JobInstance jobInstance, @NonNull Integer userId) {
+ private List<JobMetrics> getJobPipelineDetailMetrics(@NonNull JobInstance
jobInstance) {
List<JobMetrics> jobMetrics;
if (JobUtils.isJobEndStatus(jobInstance.getJobStatus())) {
- jobMetrics = getJobMetricsFromDb(jobInstance, userId,
jobInstance.getJobEngineId());
+ jobMetrics = getJobMetricsFromDb(jobInstance,
jobInstance.getJobEngineId());
if (CollectionUtils.isEmpty(jobMetrics)) {
jobMetrics = getJobMetricsFromEngine(jobInstance,
jobInstance.getJobEngineId());
if (!jobMetrics.isEmpty()) {
// If engine returns some metrics then it makes sens to
insert into database
- syncMetricsToDb(jobInstance, userId,
jobInstance.getJobEngineId());
+ syncMetricsToDb(jobInstance, jobInstance.getJobEngineId());
}
}
} else {
@@ -479,22 +464,23 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
@Override
public List<JobPipelineDetailMetricsRes> getJobPipelineDetailMetricsRes(
- @NonNull Integer userId, @NonNull Long jobInstanceId) {
+ @NonNull Long jobInstanceId) {
+ int userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_DETAIL,
userId);
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
- List<JobMetrics> jobPipelineDetailMetrics =
- getJobPipelineDetailMetrics(jobInstance, userId);
+ List<JobMetrics> jobPipelineDetailMetrics =
getJobPipelineDetailMetrics(jobInstance);
return jobPipelineDetailMetrics.stream()
.map(this::wrapperJobMetrics)
.collect(Collectors.toList());
}
@Override
- public JobDAG getJobDAG(@NonNull Integer userId, @NonNull Long
jobInstanceId) {
+ public JobDAG getJobDAG(@NonNull Long jobInstanceId) {
+ int userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_DAG,
userId);
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
String jobEngineId = jobInstance.getJobEngineId();
- JobInstanceHistory history = getJobHistoryFromDb(jobInstance, userId,
jobEngineId);
+ JobInstanceHistory history = getJobHistoryFromDb(jobInstance,
jobEngineId);
if (history != null) {
String dag = history.getDag();
return JsonUtils.parseObject(dag, JobDAG.class);
@@ -505,7 +491,7 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
if (engineMetricsExtractor.isJobEnd(jobEngineId)) {
syncHistoryJobInfoToDb(jobInstance, jobEngineId);
- history = getJobHistoryFromDb(jobInstance, userId, jobEngineId);
+ history = getJobHistoryFromDb(jobInstance, jobEngineId);
} else {
history = getJobHistoryFromEngine(jobInstance, jobEngineId);
}
@@ -528,31 +514,25 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
}
private JobInstanceHistory getJobHistoryFromDb(
- @NonNull JobInstance jobInstance, Integer userId, String
jobEngineId) {
+ @NonNull JobInstance jobInstance, String jobEngineId) {
// relation jobInstanceId and jobEngineId
- relationJobInstanceAndJobEngineId(jobInstance, userId, jobEngineId);
+ relationJobInstanceAndJobEngineId(jobInstance, jobEngineId);
return jobInstanceHistoryDao.getByInstanceId(jobInstance.getId());
}
@Override
- public void syncJobDataToDb(
- @NonNull JobInstance jobInstance,
- @NonNull Integer userId,
- @NonNull String jobEngineId) {
- relationJobInstanceAndJobEngineId(jobInstance, userId, jobEngineId);
- syncMetricsToDb(jobInstance, userId, jobEngineId);
+ public void syncJobDataToDb(@NonNull JobInstance jobInstance, @NonNull
String jobEngineId) {
+ relationJobInstanceAndJobEngineId(jobInstance, jobEngineId);
+ syncMetricsToDb(jobInstance, jobEngineId);
syncHistoryJobInfoToDb(jobInstance, jobEngineId);
syncCompleteJobInfoToDb(jobInstance);
}
- private void syncMetricsToDb(
- @NonNull JobInstance jobInstance,
- @NonNull Integer userId,
- @NonNull String jobEngineId) {
+ private void syncMetricsToDb(@NonNull JobInstance jobInstance, @NonNull
String jobEngineId) {
Map<Integer, JobMetrics> jobMetricsFromEngineMap =
getJobMetricsFromEngineMap(jobInstance, jobEngineId);
-
- List<JobMetrics> jobMetricsFromDb = getJobMetricsFromDb(jobInstance,
userId, jobEngineId);
+ int userId = ServletUtils.getCurrentUserId();
+ List<JobMetrics> jobMetricsFromDb = getJobMetricsFromDb(jobInstance,
jobEngineId);
if (jobMetricsFromDb.isEmpty()) {
List<JobMetrics> jobMetricsFromEngine =
Arrays.asList(jobMetricsFromEngineMap.values().toArray(new
JobMetrics[0]));
@@ -614,11 +594,10 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
}
private void relationJobInstanceAndJobEngineId(
- @NonNull JobInstance jobInstance,
- @NonNull Integer userId,
- @NonNull String jobEngineId) {
+ @NonNull JobInstance jobInstance, @NonNull String jobEngineId) {
// relation jobInstanceId and jobEngineId
if (StringUtils.isEmpty(jobInstance.getJobEngineId())) {
+ int userId = ServletUtils.getCurrentUserId();
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
jobInstanceDao.update(jobInstance);
@@ -649,12 +628,10 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
}
private List<JobMetrics> getJobMetricsFromDb(
- @NonNull JobInstance jobInstance,
- @NonNull Integer userId,
- @NonNull String jobEngineId) {
+ @NonNull JobInstance jobInstance, @NonNull String jobEngineId) {
// relation jobInstanceId and jobEngineId
- relationJobInstanceAndJobEngineId(jobInstance, userId, jobEngineId);
+ relationJobInstanceAndJobEngineId(jobInstance, jobEngineId);
// get metrics from db
return jobMetricsDao.getByInstanceId(jobInstance.getId());
@@ -688,10 +665,8 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
}
private void syncMetricsToDbRunning(
- @NonNull JobInstance jobInstance,
- @NonNull Integer userId,
- @NonNull Map<Integer, JobMetrics> jobMetricsMap) {
-
+ @NonNull JobInstance jobInstance, @NonNull Map<Integer,
JobMetrics> jobMetricsMap) {
+ int userId = ServletUtils.getCurrentUserId();
ArrayList<JobMetrics> list = new ArrayList<>();
for (Map.Entry<Integer, JobMetrics> entry : jobMetricsMap.entrySet()) {
JobMetrics jobMetrics = entry.getValue();
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
index 8a0499b0..e99ff4d4 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java
@@ -60,15 +60,14 @@ public class JobServiceImpl implements IJobService {
@Override
@Transactional
- public long createJob(int userId, JobCreateReq jobCreateRequest)
- throws JsonProcessingException {
+ public long createJob(JobCreateReq jobCreateRequest) throws
JsonProcessingException {
JobReq jobDefinition =
getJobDefinition(jobCreateRequest.getJobConfig());
- long jobId = jobService.createJob(userId, jobDefinition);
- createTasks(userId, jobCreateRequest, jobId);
+ long jobId = jobService.createJob(jobDefinition);
+ createTasks(jobCreateRequest, jobId);
return jobId;
}
- private void createTasks(int userId, JobCreateReq jobCreateRequest, long
jobId)
+ private void createTasks(JobCreateReq jobCreateRequest, long jobId)
throws JsonProcessingException {
List<PluginConfig> pluginConfig = jobCreateRequest.getPluginConfigs();
Set<String> edgeIds =
@@ -91,7 +90,7 @@ public class JobServiceImpl implements IJobService {
pluginNameVsPluginId.put(pluginIdKey, newPluginId);
}
}
- jobConfigService.updateJobConfig(userId, jobId,
jobCreateRequest.getJobConfig());
+ jobConfigService.updateJobConfig(jobId,
jobCreateRequest.getJobConfig());
JobDAG jobDAG = jobCreateRequest.getJobDAG();
// Replace the plugin name with plugin id
List<Edge> edges = jobDAG.getEdges();
@@ -135,14 +134,14 @@ public class JobServiceImpl implements IJobService {
}
@Override
- public void updateJob(Integer userId, long jobVersionId, JobCreateReq
jobCreateReq)
+ public void updateJob(long jobVersionId, JobCreateReq jobCreateReq)
throws JsonProcessingException {
jobTaskService.deleteTaskByVersionId(jobVersionId);
- createTasks(userId, jobCreateReq, jobVersionId);
+ createTasks(jobCreateReq, jobVersionId);
}
@Override
- public JobRes getJob(Integer userId, long jobVersionId) throws
JsonProcessingException {
+ public JobRes getJob(long jobVersionId) throws JsonProcessingException {
JobConfigRes jobConfig = jobConfigService.getJobConfig(jobVersionId);
JobTaskInfo taskConfig = jobTaskService.getTaskConfig(jobVersionId);
return new JobRes(jobConfig, taskConfig.getPlugins(), new
JobDAG(taskConfig.getEdges()));
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
index d40b69ad..f30ea239 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
@@ -67,7 +67,6 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
@Override
public Result<PageInfo<SeaTunnelJobInstanceDto>> getSyncTaskInstancePaging(
- Integer userId,
String jobDefineName,
String executorName,
String stateType,
@@ -93,7 +92,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
return result;
}
addRunningTimeToResult(records);
- jobPipelineSummaryMetrics(records, jobMode, userId);
+ jobPipelineSummaryMetrics(records, jobMode);
pageInfo.setTotal((int) jobInstanceIPage.getTotal());
pageInfo.setTotalList(records);
result.setData(pageInfo);
@@ -101,9 +100,9 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
private void populateExecutionMetricsData(
- Integer userId, JobMode jobMode, List<SeaTunnelJobInstanceDto>
records) {
+ JobMode jobMode, List<SeaTunnelJobInstanceDto> records) {
addRunningTimeToResult(records);
- jobPipelineSummaryMetrics(records, jobMode, userId);
+ jobPipelineSummaryMetrics(records, jobMode);
}
private void addRunningTimeToResult(List<SeaTunnelJobInstanceDto> records)
{
@@ -135,8 +134,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
}
- private void jobPipelineSummaryMetrics(
- List<SeaTunnelJobInstanceDto> records, JobMode jobMode, Integer
userId) {
+ private void jobPipelineSummaryMetrics(List<SeaTunnelJobInstanceDto>
records, JobMode jobMode) {
try {
ArrayList<Long> jobInstanceIdList = new ArrayList<>();
HashMap<Long, Long> jobInstanceIdAndJobEngineIdMap = new
HashMap<>();
@@ -151,7 +149,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
Map<Long, JobSummaryMetricsRes> jobSummaryMetrics =
jobMetricsService.getALLJobSummaryMetrics(
- userId, jobInstanceIdAndJobEngineIdMap,
jobInstanceIdList, jobMode);
+ jobInstanceIdAndJobEngineIdMap, jobInstanceIdList,
jobMode);
for (SeaTunnelJobInstanceDto taskInstance : records) {
if (jobSummaryMetrics.get(taskInstance.getId()) != null) {
@@ -170,7 +168,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
@Override
- public Result<JobExecutionStatus> getJobExecutionStatus(Integer userId,
long jobInstanceId) {
+ public Result<JobExecutionStatus> getJobExecutionStatus(long
jobInstanceId) {
JobInstance jobInstance =
jobInstanceDao.getJobExecutionStatus(jobInstanceId);
if (jobInstance == null) {
throw new SeatunnelException(
@@ -181,8 +179,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
@Override
- public Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(
- Integer userId, long jobInstanceId) {
+ public Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(long
jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
if (jobInstance == null) {
throw new SeatunnelException(
@@ -190,7 +187,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
SeaTunnelJobInstanceDto executionDetails = convertToDto(jobInstance);
populateExecutionMetricsData(
- userId, jobInstance.getJobType(),
Collections.singletonList(executionDetails));
+ jobInstance.getJobType(),
Collections.singletonList(executionDetails));
return Result.success(executionDetails);
}
@@ -214,7 +211,7 @@ public class TaskInstanceServiceImpl implements
ITaskInstanceService<SeaTunnelJo
}
@Override
- public Result<Void> deleteJobInstanceById(Integer userId, long
jobInstanceId) {
+ public Result<Void> deleteJobInstanceById(long jobInstanceId) {
jobInstanceDao.deleteById(jobInstanceId);
return Result.success();
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/VirtualTableServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/VirtualTableServiceImpl.java
index 50303c45..b3efb204 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/VirtualTableServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/VirtualTableServiceImpl.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.app.service.IJobDefinitionService;
import org.apache.seatunnel.app.service.IVirtualTableService;
import org.apache.seatunnel.app.thirdparty.datasource.DataSourceClientFactory;
import
org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper;
+import org.apache.seatunnel.app.utils.ServletUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
@@ -73,38 +74,39 @@ public class VirtualTableServiceImpl extends
SeatunnelBaseServiceImpl
@Autowired private ConnectorDataSourceMapperConfig dataSourceMapperConfig;
@Override
- public String createVirtualTable(Integer userId, VirtualTableReq req)
- throws CodeGenerateUtils.CodeGenerateException {
+ public String createVirtualTable(VirtualTableReq tableReq) {
+ Integer userId = ServletUtils.getCurrentUserId();
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.VIRTUAL_TABLE_CREATE,
userId);
- // check user has permission to create virtual table
+
long uuid = CodeGenerateUtils.getInstance().genCode();
- Long datasourceId = Long.valueOf(req.getDatasourceId());
+ Long datasourceId = Long.valueOf(tableReq.getDatasourceId());
boolean isUnique =
virtualTableDao.checkVirtualTableNameUnique(
- req.getTableName(), req.getDatabaseName(), 0L);
+ tableReq.getTableName(), tableReq.getDatabaseName(),
0L);
if (!isUnique) {
throw new SeatunnelException(
- SeatunnelErrorEnum.VIRTUAL_TABLE_ALREADY_EXISTS,
req.getTableName());
+ SeatunnelErrorEnum.VIRTUAL_TABLE_ALREADY_EXISTS,
tableReq.getTableName());
}
VirtualTable virtualTable =
VirtualTable.builder()
.id(uuid)
.datasourceId(datasourceId)
- .virtualDatabaseName(req.getDatabaseName())
- .virtualTableName(req.getTableName())
- .description(req.getDescription())
+ .virtualDatabaseName(tableReq.getDatabaseName())
+ .virtualTableName(tableReq.getTableName())
+ .description(tableReq.getDescription())
.createTime(new Date())
.updateTime(new Date())
.createUserId(userId)
.updateUserId(userId)
.build();
- if (CollectionUtils.isEmpty(req.getTableFields())) {
+ if (CollectionUtils.isEmpty(tableReq.getTableFields())) {
throw new
SeatunnelException(SeatunnelErrorEnum.VIRTUAL_TABLE_FIELD_EMPTY);
}
- String fieldJson = convertTableFields(req.getTableFields());
+ String fieldJson = convertTableFields(tableReq.getTableFields());
virtualTable.setTableFields(fieldJson);
-
virtualTable.setVirtualTableConfig(JsonUtils.toJsonString(req.getDatabaseProperties()));
+ virtualTable.setVirtualTableConfig(
+ JsonUtils.toJsonString(tableReq.getDatabaseProperties()));
boolean success = virtualTableDao.insertVirtualTable(virtualTable);
if (!success) {
@@ -134,9 +136,9 @@ public class VirtualTableServiceImpl extends
SeatunnelBaseServiceImpl
}
@Override
- public Boolean updateVirtualTable(
- @NotNull Integer userId, @NotNull String tableId, VirtualTableReq
req) {
-
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.VIRTUAL_TABLE_UPDATE,
userId);
+ public boolean updateVirtualTable(@NotNull String tableId, VirtualTableReq
req) {
+ Integer currentUserId = ServletUtils.getCurrentUserId();
+
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.VIRTUAL_TABLE_UPDATE,
currentUserId);
VirtualTable originalTable =
virtualTableDao.selectVirtualTableById(Long.valueOf(tableId));
if (null == originalTable) {
throw new
SeatunnelException(SeatunnelErrorEnum.VIRTUAL_TABLE_NOT_EXISTS);
@@ -159,7 +161,7 @@ public class VirtualTableServiceImpl extends
SeatunnelBaseServiceImpl
.virtualTableName(req.getTableName())
.description(req.getDescription())
.updateTime(new Date())
- .updateUserId(userId)
+ .updateUserId(currentUserId)
.build();
if (CollectionUtils.isNotEmpty(req.getTableFields())) {
String fieldJson = convertTableFields(req.getTableFields());
@@ -172,9 +174,11 @@ public class VirtualTableServiceImpl extends
SeatunnelBaseServiceImpl
}
@Override
- public Boolean deleteVirtualTable(@NotNull Integer userId, @NotNull String
tableId) {
- // todo check has permission and has job using this table
-
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.VIRTUAL_TABLE_DELETE,
userId);
+ public boolean deleteVirtualTable(@NotNull String tableId) {
+ Integer currentUserId = ServletUtils.getCurrentUserId();
+
+
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.VIRTUAL_TABLE_DELETE,
currentUserId);
+
VirtualTable virtualTable =
virtualTableDao.selectVirtualTableById(Long.valueOf(tableId));
if (virtualTable == null) {
throw new
SeatunnelException(SeatunnelErrorEnum.VIRTUAL_TABLE_NOT_EXISTS);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/ServletUtils.java
similarity index 66%
copy from
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
copy to
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/ServletUtils.java
index ad227422..5fe0b7ae 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/ServletUtils.java
@@ -15,16 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.app.service;
+package org.apache.seatunnel.app.utils;
-import org.apache.seatunnel.app.common.Result;
-import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.app.dal.entity.User;
+import org.apache.seatunnel.app.security.UserContext;
-public interface IJobExecutorService {
+public class ServletUtils {
- Result<Long> jobExecute(Integer userId, Long jobDefineId, JobExecParam
executeParam);
+ public static User getCurrentUser() {
+ return UserContext.getUser();
+ }
- Result<Void> jobPause(Integer userId, Long jobInstanceId);
-
- Result<Void> jobStore(Integer userId, Long jobInstanceId);
+ public static Integer getCurrentUserId() {
+ return getCurrentUser().getId();
+ }
}
diff --git a/seatunnel-server/seatunnel-app/src/main/resources/application.yml
b/seatunnel-server/seatunnel-app/src/main/resources/application.yml
index 6a06d519..5a6e3a06 100644
--- a/seatunnel-server/seatunnel-app/src/main/resources/application.yml
+++ b/seatunnel-server/seatunnel-app/src/main/resources/application.yml
@@ -41,6 +41,11 @@ spring:
providers:
- DB
#- LDAP # LDAP authentication is disabled by default
+ async-config:
+ core-pool-size: 10
+ max-pool-size: 20
+ queue-capacity: 100
+
jwt:
expireTime: 86400
# please add key when deploy
diff --git
a/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/security/AsyncUserContextTest.java
b/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/security/AsyncUserContextTest.java
new file mode 100644
index 00000000..13ee42f9
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/security/AsyncUserContextTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.security;
+
+import org.apache.seatunnel.app.config.AsyncConfig;
+import org.apache.seatunnel.app.dal.entity.User;
+import org.apache.seatunnel.app.utils.ServletUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Test the passing of user context in asynchronous tasks. This test focuses
only on user context
+ * functionality without requiring a full Spring context.
+ */
+@Slf4j
+public class AsyncUserContextTest {
+
+ private ThreadPoolTaskExecutor taskExecutor;
+
+ @BeforeEach
+ public void setup() {
+ // Create a standalone thread pool executor for testing
+ taskExecutor = new ThreadPoolTaskExecutor();
+ taskExecutor.setCorePoolSize(10);
+ taskExecutor.setMaxPoolSize(20);
+ taskExecutor.setQueueCapacity(100);
+ taskExecutor.setThreadNamePrefix("TestAsync-");
+ taskExecutor.setTaskDecorator(new
AsyncConfig.ContextCopyingDecorator());
+ taskExecutor.initialize();
+ }
+
+ @AfterEach
+ public void cleanup() {
+ if (taskExecutor != null) {
+ taskExecutor.shutdown();
+ }
+ // Clear any remaining user context
+ UserContext.clear();
+ }
+
+ @Test
+ public void testMultipleUsersAsync() throws Exception {
+ log.info("Starting multiple users concurrent test...");
+ int userCount = 10;
+ CyclicBarrier barrier = new CyclicBarrier(userCount);
+ CountDownLatch latch = new CountDownLatch(userCount);
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ for (int i = 0; i < userCount; i++) {
+ final int userId = i;
+
+ User user = new User();
+ user.setId(userId);
+ user.setUsername("user" + userId);
+
+ log.info(
+ "Setting main thread user context: userId={}, username={}",
+ userId,
+ user.getUsername());
+ UserContext.setUser(user);
+
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ log.info(
+ "Async task waiting to start:
userId={}, threadName={}",
+ userId,
+ Thread.currentThread().getName());
+ barrier.await();
+ log.info(
+ "Async task started: userId={},
threadName={}",
+ userId,
+ Thread.currentThread().getName());
+
+ User asyncUser =
ServletUtils.getCurrentUser();
+ log.info(
+ "Async task got user info:
userId={}, username={}, threadName={}",
+ asyncUser.getId(),
+ asyncUser.getUsername(),
+ Thread.currentThread().getName());
+
+ assertNotNull(asyncUser, "User info should
not be null");
+ assertEquals(userId, asyncUser.getId(),
"User ID mismatch");
+ assertEquals(
+ "user" + userId,
+ asyncUser.getUsername(),
+ "Username mismatch");
+
+ Thread.sleep(100);
+ log.info(
+ "Async task completed: userId={},
threadName={}",
+ userId,
+ Thread.currentThread().getName());
+
+ } catch (Exception e) {
+ log.error("Async task execution failed:
userId=" + userId, e);
+ throw new RuntimeException(e);
+ } finally {
+ latch.countDown();
+ }
+ },
+ taskExecutor);
+
+ futures.add(future);
+ }
+
+ log.info("Waiting for all async tasks to complete...");
+ latch.await();
+
+ try {
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
+ log.info("All async tasks completed successfully");
+ } catch (Exception e) {
+ log.error("Some async tasks failed", e);
+ throw e;
+ }
+ }
+
+ @Test
+ public void testNestedAsyncCalls() throws Exception {
+ log.info("Starting nested async calls test...");
+
+ User user = new User();
+ user.setId(1);
+ user.setUsername("testUser");
+
+ log.info(
+ "Setting main thread user context: userId={}, username={}",
+ user.getId(),
+ user.getUsername());
+ UserContext.setUser(user);
+
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ log.info(
+ "First level async call started:
threadName={}",
+ Thread.currentThread().getName());
+ User firstLevelUser =
ServletUtils.getCurrentUser();
+ log.info(
+ "First level async call got user info:
userId={}, username={}",
+ firstLevelUser.getId(),
+ firstLevelUser.getUsername());
+ assertEquals(1, firstLevelUser.getId());
+
+ CompletableFuture.runAsync(
+ () -> {
+ log.info(
+ "Second level async
call started: threadName={}",
+
Thread.currentThread().getName());
+ User secondLevelUser =
+
ServletUtils.getCurrentUser();
+ log.info(
+ "Second level async
call got user info: userId={}, username={}",
+
secondLevelUser.getId(),
+
secondLevelUser.getUsername());
+ assertEquals(1,
secondLevelUser.getId());
+ },
+ taskExecutor)
+ .join();
+
+ log.info("Nested async calls completed");
+ },
+ taskExecutor);
+
+ future.join();
+ log.info("Nested async calls test completed");
+ }
+
+ @Test
+ public void testUserContextIsolation() throws Exception {
+ log.info("Starting user context isolation test...");
+ CountDownLatch latch = new CountDownLatch(2);
+
+ // First user
+ User user1 = new User();
+ user1.setId(1);
+ UserContext.setUser(user1);
+ log.info("Setting first user context: userId={}", user1.getId());
+
+ // Capture current user to avoid context loss during thread switching
+ User capturedUser1 = user1;
+ CompletableFuture<Integer> future1 =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ log.info(
+ "First user's async task started:
threadName={}",
+ Thread.currentThread().getName());
+ Thread.sleep(1000);
+ UserContext.setUser(capturedUser1);
+ User currentUser =
ServletUtils.getCurrentUser();
+ log.info(
+ "First user's async task got user:
userId={}",
+ currentUser.getId());
+ return currentUser.getId();
+ } catch (Exception e) {
+ log.error("First user's async task failed", e);
+ throw new RuntimeException(e);
+ } finally {
+ UserContext.clear();
+ latch.countDown();
+ }
+ },
+ taskExecutor);
+
+ // Second user
+ User user2 = new User();
+ user2.setId(2);
+ UserContext.setUser(user2);
+ log.info("Setting second user context: userId={}", user2.getId());
+
+ // Capture current user to avoid context loss during thread switching
+ User capturedUser2 = user2;
+ CompletableFuture<Integer> future2 =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ log.info(
+ "Second user's async task started:
threadName={}",
+ Thread.currentThread().getName());
+ UserContext.setUser(capturedUser2);
+ User currentUser =
ServletUtils.getCurrentUser();
+ log.info(
+ "Second user's async task got user:
userId={}",
+ currentUser.getId());
+ return currentUser.getId();
+ } catch (Exception e) {
+ log.error("Second user's async task failed",
e);
+ throw new RuntimeException(e);
+ } finally {
+ UserContext.clear();
+ latch.countDown();
+ }
+ },
+ taskExecutor);
+
+ log.info("Waiting for async tasks to complete...");
+ latch.await();
+
+ int result1 = future1.get();
+ int result2 = future2.get();
+ log.info("Verification results: user1={}, user2={}", result1, result2);
+ assertEquals(1, result1);
+ assertEquals(2, result2);
+ log.info("User context isolation test completed");
+
+ UserContext.clear();
+ }
+}