This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 071994933b [Fix-16740][SeaTunnel-Task] fix can't submit resource
center config file issue (#16741)
071994933b is described below
commit 071994933b05850e7dd5f7bccbf45d867640e244
Author: Jarvis <[email protected]>
AuthorDate: Thu Oct 31 21:29:16 2024 +0800
[Fix-16740][SeaTunnel-Task] fix can't submit resource center config file
issue (#16741)
---
.../plugin/task/seatunnel/SeatunnelTask.java | 28 ++++-----
.../plugin/task/seatunnel/SeatunnelTaskTest.java | 70 +++++++++++++++++++---
.../projects/task/components/node/format-data.ts | 4 +-
3 files changed, 78 insertions(+), 24 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index 2aadab8039..b5b85427f7 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -30,6 +30,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import
org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
@@ -154,26 +155,21 @@ public class SeatunnelTask extends AbstractRemoteTask {
protected List<String> buildOptions() throws Exception {
List<String> args = new ArrayList<>();
+ args.add(CONFIG_OPTIONS);
+ String scriptContent;
if (BooleanUtils.isTrue(seatunnelParameters.getUseCustom())) {
- args.add(CONFIG_OPTIONS);
- args.add(buildCustomConfigCommand());
+ scriptContent = buildCustomConfigContent();
} else {
- seatunnelParameters.getResourceList().forEach(resourceInfo -> {
- args.add(CONFIG_OPTIONS);
- // TODO: Need further check for refactored resource center
- // TODO Currently resourceName is `/xxx.sh`, it has more `/`
and needs to be optimized
- args.add(resourceInfo.getResourceName().replaceFirst(".*:",
""));
- });
+ String resourceFileName =
seatunnelParameters.getResourceList().get(0).getResourceName();
+ ResourceContext resourceContext =
taskExecutionContext.getResourceContext();
+ scriptContent = FileUtils.readFileToString(
+ new
File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
+ StandardCharsets.UTF_8);
}
- return args;
- }
-
- protected String buildCustomConfigCommand() throws Exception {
- String config = buildCustomConfigContent();
String filePath = buildConfigFilePath();
- createConfigFileIfNotExists(config, filePath);
-
- return filePath;
+ createConfigFileIfNotExists(scriptContent, filePath);
+ args.add(filePath);
+ return args;
}
private String buildCustomConfigContent() {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
similarity index 54%
rename from
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
rename to
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
index 70abee3281..11fffedd80 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
@@ -14,35 +14,91 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.junit.Test;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
+
+import org.apache.commons.io.FileUtils;
+
+import java.util.Collections;
+
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
public class SeatunnelTaskTest {
- private static final String EXECUTE_PATH = "/home";
- private static final String TASK_APPID = "9527";
+
+ private static final String EXECUTE_PATH = "/tmp";
+ private static final String RESOURCE_SCRIPT_PATH = "/tmp/demo.conf";
+
+ private MockedStatic<FileUtils> mockedStaticFileUtils;
+
+ @BeforeEach
+ public void setUp() {
+ mockedStaticFileUtils = Mockito.mockStatic(FileUtils.class);
+ }
+
+ @AfterEach
+ public void after() {
+ mockedStaticFileUtils.close();
+ }
@Test
- public void formatDetector() throws Exception{
+ public void formatDetector() throws Exception {
+ String taskId = "1234";
SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
+ seatunnelParameters.setUseCustom(true);
seatunnelParameters.setRawScript(RAW_SCRIPT);
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH);
- taskExecutionContext.setTaskAppId(TASK_APPID);
+ taskExecutionContext.setTaskAppId(taskId);
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
- Assertions.assertEquals("/home/seatunnel_9527.conf",
seatunnelTask.buildCustomConfigCommand());
+ String command1 = String.join(" ", seatunnelTask.buildOptions());
+ String expectedCommand1 = String.format("--config
%s/seatunnel_%s.conf", EXECUTE_PATH, taskId);
+ Assertions.assertEquals(expectedCommand1, command1);
seatunnelParameters.setRawScript(RAW_SCRIPT_2);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
- Assertions.assertEquals("/home/seatunnel_9527.json",
seatunnelTask.buildCustomConfigCommand());
+ String command2 = String.join(" ", seatunnelTask.buildOptions());
+ String expectedCommand2 = String.format("--config
%s/seatunnel_%s.json", EXECUTE_PATH, taskId);
+ Assertions.assertEquals(expectedCommand2, command2);
+ }
+
+ @Test
+ public void testReadConfigFromResourceCenter() throws Exception {
+ String taskId = "2345";
+ SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
+ seatunnelParameters.setUseCustom(false);
+ ResourceInfo resourceInfo = new ResourceInfo();
+ resourceInfo.setResourceName(RESOURCE_SCRIPT_PATH);
+
seatunnelParameters.setResourceList(Collections.singletonList(resourceInfo));
+
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setExecutePath(EXECUTE_PATH);
+ taskExecutionContext.setTaskAppId(taskId);
+
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
+ ResourceContext resourceContext = new ResourceContext();
+ resourceContext.addResourceItem(new
ResourceContext.ResourceItem(RESOURCE_SCRIPT_PATH, RESOURCE_SCRIPT_PATH));
+ taskExecutionContext.setResourceContext(resourceContext);
+
+ SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
+ seatunnelTask.setSeatunnelParameters(seatunnelParameters);
+ String command = String.join(" ", seatunnelTask.buildOptions());
+ String expectedCommand = String.format("--config
%s/seatunnel_%s.conf", EXECUTE_PATH, taskId);
+ Assertions.assertEquals(expectedCommand, command);
}
+
private static final String RAW_SCRIPT = "env {\n" +
" execution.parallelism = 2\n" +
" job.mode = \"BATCH\"\n" +
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index fc5a1d9466..006756f10d 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -205,7 +205,9 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'SEATUNNEL') {
taskParams.startupScript = data.startupScript
taskParams.useCustom = data.useCustom
- taskParams.rawScript = data.rawScript
+ if (!data.useCustom) {
+ taskParams.rawScript = ''
+ }
if (data.startupScript?.includes('flink')) {
taskParams.runMode = data.runMode
taskParams.others = data.others