This is an automated email from the ASF dual-hosted git repository.

wangyang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3fe9fd45b1 [Fix-15706] Seatunnel improvement (#15852)
3fe9fd45b1 is described below

commit 3fe9fd45b12e2e6d190729e64b2c4663b0752ada
Author: XinXing <49302071+xinxi...@users.noreply.github.com>
AuthorDate: Thu Apr 18 10:36:18 2024 +0800

    [Fix-15706] Seatunnel improvement (#15852)
    
    * fix_seatunnel_15706
    
    * CodeFormat
    
    * Change to use JSONUtils
    
    * Constants moved to constant code
    
    * Delete empty lines
    
    * Delete empty lines
---
 .../dolphinscheduler/common/utils/JSONUtils.java   |  6 +-
 .../plugin/task/seatunnel/Constants.java           |  2 +
 .../plugin/task/seatunnel/SeatunnelTask.java       |  9 ++-
 .../plugin/task/seatunnel/SeatunnelTaskTest.java   | 93 ++++++++++++++++++++++
 4 files changed, 107 insertions(+), 3 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index 6750b364f9..bfc3af2c58 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -196,7 +196,10 @@ public final class JSONUtils {
      * @return true if valid
      */
     public static boolean checkJsonValid(String json) {
+        return checkJsonValid(json, true);
+    }
 
+    public static boolean checkJsonValid(String json, Boolean logFlag) {
         if (Strings.isNullOrEmpty(json)) {
             return false;
         }
@@ -205,7 +208,8 @@ public final class JSONUtils {
             objectMapper.readTree(json);
             return true;
         } catch (IOException e) {
-            log.error("check json object valid exception!", e);
+            if (logFlag)
+                log.error("check json object valid exception!", e);
         }
 
         return false;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java
index fb1c52ee18..5f0f22b0a1 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java
@@ -29,5 +29,7 @@ public class Constants {
     public static final String STARTUP_SCRIPT_SPARK = "spark";
     public static final String STARTUP_SCRIPT_FLINK = "flink";
     public static final String STARTUP_SCRIPT_SEATUNNEL = "seatunnel";
+    public static final String JSON_SUFFIX = "json";
+    public static final String CONF_SUFFIX = "conf";
 
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index 547e0158ef..2aadab8039 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -184,8 +184,13 @@ public class SeatunnelTask extends AbstractRemoteTask {
     }
 
     private String buildConfigFilePath() {
-        return String.format("%s/seatunnel_%s.conf", 
taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTaskAppId());
+        return String.format("%s/seatunnel_%s.%s", 
taskExecutionContext.getExecutePath(),
+                taskExecutionContext.getTaskAppId(), formatDetector());
+    }
+
+    private String formatDetector() {
+        return JSONUtils.checkJsonValid(seatunnelParameters.getRawScript(), 
false) ? Constants.JSON_SUFFIX
+                : Constants.CONF_SUFFIX;
     }
 
     private void createConfigFileIfNotExists(String script, String scriptFile) 
throws IOException {
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
new file mode 100644
index 0000000000..8f4c2a815a
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.task.seatunnel;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+public class SeatunnelTaskTest {
+    private static final String EXECUTE_PATH = "/home";
+    private static final String TASK_APPID = "9527";
+
+    @Test
+    public void formatDetector() throws Exception{
+        SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
+        seatunnelParameters.setRawScript(RAW_SCRIPT);
+
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setExecutePath(EXECUTE_PATH);
+        taskExecutionContext.setTaskAppId(TASK_APPID);
+        
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
+
+        SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
+        seatunnelTask.setSeatunnelParameters(seatunnelParameters);
+        Assertions.assertEquals("/home/seatunnel_9527.conf", 
seatunnelTask.buildCustomConfigCommand());
+
+        seatunnelParameters.setRawScript(RAW_SCRIPT_2);
+        seatunnelTask.setSeatunnelParameters(seatunnelParameters);
+        Assertions.assertEquals("/home/seatunnel_9527.json", 
seatunnelTask.buildCustomConfigCommand());
+    }
+    private static final String RAW_SCRIPT = "env {\n" +
+            "  execution.parallelism = 2\n" +
+            "  job.mode = \"BATCH\"\n" +
+            "  checkpoint.interval = 10000\n" +
+            "}\n" +
+            "\n" +
+            "source {\n" +
+            "  FakeSource {\n" +
+            "    parallelism = 2\n" +
+            "    result_table_name = \"fake\"\n" +
+            "    row.num = 16\n" +
+            "    schema = {\n" +
+            "      fields {\n" +
+            "        name = \"string\"\n" +
+            "        age = \"int\"\n" +
+            "      }\n" +
+            "    }\n" +
+            "  }\n" +
+            "}\n" +
+            "\n" +
+            "sink {\n" +
+            "  Console {\n" +
+            "  }\n" +
+            "}";
+    private static final String RAW_SCRIPT_2 = "{\n" +
+            "  \"env\": {\n" +
+            "    \"execution.parallelism\": 2,\n" +
+            "    \"job.mode\": \"BATCH\",\n" +
+            "    \"checkpoint.interval\": 10000\n" +
+            "  },\n" +
+            "  \"source\": {\n" +
+            "    \"FakeSource\": {\n" +
+            "      \"parallelism\": 2,\n" +
+            "      \"result_table_name\": \"fake\",\n" +
+            "      \"row.num\": 16,\n" +
+            "      \"schema\": {\n" +
+            "        \"fields\": {\n" +
+            "          \"name\": \"string\",\n" +
+            "          \"age\": \"int\"\n" +
+            "        }\n" +
+            "      }\n" +
+            "    }\n" +
+            "  },\n" +
+            "  \"sink\": {\n" +
+            "    \"Console\": {}\n" +
+            "  }\n" +
+            "}";
+}
\ No newline at end of file

Reply via email to