gabrywu commented on a change in pull request #2872:
URL: 
https://github.com/apache/incubator-dolphinscheduler/pull/2872#discussion_r436667893



##########
File path: 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
##########
@@ -50,23 +50,42 @@ private JSONUtils() {
         
objectMapper.configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT,
 true).setTimeZone(TimeZone.getDefault());
     }
 
+    public static ObjectMapper getMapper() {

Review comment:
       getMapper better private ,we should not use ObjectMapper directly

##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
##########
@@ -551,7 +562,13 @@ private void addDependResultForTaskList(List<TaskInstance> 
taskInstanceList) thr
             String localParams = map.get(LOCAL_PARAMS);
             if (localParams != null && !localParams.isEmpty()) {
                 localParams = 
ParameterUtils.convertParameterPlaceholders(localParams, timeParams);
-                List<Property> localParamsList = JSON.parseArray(localParams, 
Property.class);
+                List<Property> localParamsList = new ArrayList<>();
+                try {
+                    localParamsList = 
JSONUtils.getMapper().readValue(localParams, new 
TypeReference<List<Property>>() {});

Review comment:
       don't use getMapper directly , abstract one method to deserialize List<?>

##########
File path: 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
##########
@@ -408,9 +406,8 @@ public boolean isYarnEnabled() {
      *
      * @param applicationId application id
      * @return the return may be null or there may be other parse exceptions
-     * @throws JSONException json exception
      */
-    public ExecutionStatus getApplicationStatus(String applicationId) throws 
JSONException {
+    public ExecutionStatus getApplicationStatus(String applicationId)  {

Review comment:
       why remove the 'throws JSONException', I think it's better not remove

##########
File path: 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
##########
@@ -421,15 +418,15 @@ public ExecutionStatus getApplicationStatus(String 
applicationId) throws JSONExc
 
         String responseContent = HttpUtils.get(applicationUrl);
         if (responseContent != null) {
-            JSONObject jsonObject = JSON.parseObject(responseContent);
-            result = jsonObject.getJSONObject("app").getString("finalStatus");
+            ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
+            result = jsonObject.path("app").path("finalStatus").asText();
         } else {
             //may be in job history
             String jobHistoryUrl = getJobHistoryUrl(applicationId);
             logger.info("jobHistoryUrl={}", jobHistoryUrl);
             responseContent = HttpUtils.get(jobHistoryUrl);
-            JSONObject jsonObject = JSONObject.parseObject(responseContent);
-            result = jsonObject.getJSONObject("job").getString("state");
+            ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
+            result = jsonObject.path("job").path("state").asText();

Review comment:
       result = jsonObject.path("job").path("state").asText() always assign a 
value to result ,which empty string indicating a null value . In original logic 
,null value will throw NPE in the following switch. Now the result is empty 
string ,then getApplicationStatus method will return 
ExecutionStatus.RUNNING_EXEUTION. I think you'd better optimize the switch logic

##########
File path: 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
##########
@@ -421,15 +418,15 @@ public ExecutionStatus getApplicationStatus(String 
applicationId) throws JSONExc
 
         String responseContent = HttpUtils.get(applicationUrl);
         if (responseContent != null) {
-            JSONObject jsonObject = JSON.parseObject(responseContent);
-            result = jsonObject.getJSONObject("app").getString("finalStatus");
+            ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
+            result = jsonObject.path("app").path("finalStatus").asText();
         } else {
             //may be in job history
             String jobHistoryUrl = getJobHistoryUrl(applicationId);
             logger.info("jobHistoryUrl={}", jobHistoryUrl);
             responseContent = HttpUtils.get(jobHistoryUrl);
-            JSONObject jsonObject = JSONObject.parseObject(responseContent);
-            result = jsonObject.getJSONObject("job").getString("state");
+            ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
+            result = jsonObject.path("job").path("state").asText();

Review comment:
       result = jsonObject.path("job").path("state").asText() always assign a 
value to result ,which empty string indicating a null value . In original logic 
,null value will throw NPE in the following switch. Now the result is empty 
string ,then getApplicationStatus method will return 
ExecutionStatus.RUNNING_EXEUTION. I think you'd better optimize the switch 
logic. only a suggestion

##########
File path: 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
##########
@@ -669,10 +667,10 @@ public static String getRMState(String url) {
                 return null;
             }
             //to json
-            JSONObject jsonObject = JSON.parseObject(retStr);
+            ObjectNode jsonObject = JSONUtils.parseObject(retStr);
 
             //get ResourceManager state
-            return 
jsonObject.getJSONObject("clusterInfo").getString("haState");
+            return jsonObject.get("clusterInfo").path("haState").asText();

Review comment:
       jsonObject.get("clusterInfo") will throw NPE

##########
File path: 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
##########
@@ -50,23 +50,42 @@ private JSONUtils() {
         
objectMapper.configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT,
 true).setTimeZone(TimeZone.getDefault());
     }
 
+    public static ObjectMapper getMapper() {

Review comment:
       getMapper,createArrayNode,createObjectNode better private ,we should not 
use ObjectMapper directly

##########
File path: 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
##########
@@ -271,7 +279,16 @@ public String getGlobalParams() {
     }
 
     public void setGlobalParams(String globalParams) {
-        this.globalParamList = JSON.parseArray(globalParams, Property.class);
+        if (globalParams == null){
+            this.globalParamList = new ArrayList<>();
+        }else {
+            try {
+                this.globalParamList = 
JSONUtils.getMapper().readValue(globalParams, new 
TypeReference<List<Property>>() {

Review comment:
       don't use getMapper directly

##########
File path: 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
##########
@@ -280,15 +297,21 @@ public void setGlobalParams(String globalParams) {
     }
 
     public void setGlobalParamList(List<Property> globalParamList) {
-        this.globalParams = JSON.toJSONString(globalParamList);
+        this.globalParams = JSONUtils.toJsonString(globalParamList);
         this.globalParamList = globalParamList;
     }
 
     public Map<String, String> getGlobalParamMap() {
-        List<Property> propList;
+        List<Property> propList = new ArrayList<> ();
 
         if (globalParamMap == null && StringUtils.isNotEmpty(globalParams)) {
-            propList = JSON.parseArray(globalParams, Property.class);
+            try {
+                propList = JSONUtils.getMapper().readValue(globalParams, new 
TypeReference<List<Property>>() {

Review comment:
       don't use getMapper directky

##########
File path: 
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
##########
@@ -280,15 +297,21 @@ public void setGlobalParams(String globalParams) {
     }
 
     public void setGlobalParamList(List<Property> globalParamList) {
-        this.globalParams = JSON.toJSONString(globalParamList);
+        this.globalParams = JSONUtils.toJsonString(globalParamList);
         this.globalParamList = globalParamList;
     }
 
     public Map<String, String> getGlobalParamMap() {
-        List<Property> propList;
+        List<Property> propList = new ArrayList<> ();
 
         if (globalParamMap == null && StringUtils.isNotEmpty(globalParams)) {
-            propList = JSON.parseArray(globalParams, Property.class);
+            try {
+                propList = JSONUtils.getMapper().readValue(globalParams, new 
TypeReference<List<Property>>() {

Review comment:
       don't use getMapper directly

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
##########
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
public class TaskExecuteRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public TaskExecuteRequestCommand() {
    }

    public TaskExecuteRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteRequestCommand{" +
                "taskExecutionContext='" + taskExecutionContext + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
public class TaskExecuteRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public TaskExecuteRequestCommand() {
    }

    public TaskExecuteRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_REQUEST);
        byte[] body = JacksonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteRequestCommand{" +
                "taskExecutionContext='" + taskExecutionContext + '\'' +
                '}';
    }
}

Review comment:
       wow, what's format of this file!

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
##########
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task request command
 */
public class TaskExecuteAckCommand implements Serializable {

    /**
     * taskInstanceId
     */
    private int taskInstanceId;

    /**
     * startTime
     */
    private Date startTime;

    /**
     * host
     */
    private String host;

    /**
     * status
     */
    private int status;

    /**
     * logPath
     */
    private String logPath;

    /**
     * executePath
     */
    private String executePath;

    public Date getStartTime() {
        return startTime;
    }

    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = logPath;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_ACK);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteAckCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", startTime=" + startTime +
                ", host='" + host + '\'' +
                ", status=" + status +
                ", logPath='" + logPath + '\'' +
                ", executePath='" + executePath + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task request command
 */
public class TaskExecuteAckCommand implements Serializable {

    /**
     * taskInstanceId
     */
    private int taskInstanceId;

    /**
     * startTime
     */
    private Date startTime;

    /**
     * host
     */
    private String host;

    /**
     * status
     */
    private int status;

    /**
     * logPath
     */
    private String logPath;

    /**
     * executePath
     */
    private String executePath;

    public Date getStartTime() {
        return startTime;
    }

    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = logPath;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_ACK);
        byte[] body = JacksonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteAckCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", startTime=" + startTime +
                ", host='" + host + '\'' +
                ", status=" + status +
                ", logPath='" + logPath + '\'' +
                ", executePath='" + executePath + '\'' +
                '}';
    }
}

Review comment:
       wow, what's format of this file!

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
##########
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task response command
 */
public class TaskExecuteResponseCommand implements Serializable {


    public TaskExecuteResponseCommand() {
    }

    public TaskExecuteResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    /**
     * processId
     */
    private int processId;

    /**
     * appIds
     */
    private String appIds;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        this.endTime = endTime;
    }

    public int getProcessId() {
        return processId;
    }

    public void setProcessId(int processId) {
        this.processId = processId;
    }

    public String getAppIds() {
        return appIds;
    }

    public void setAppIds(String appIds) {
        this.appIds = appIds;
    }

    /**
     * package response command
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", status=" + status +
                ", endTime=" + endTime +
                ", processId=" + processId +
                ", appIds='" + appIds + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task response command
 */
public class TaskExecuteResponseCommand implements Serializable {


    public TaskExecuteResponseCommand() {
    }

    public TaskExecuteResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    /**
     * processId
     */
    private int processId;

    /**
     * appIds
     */
    private String appIds;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        this.endTime = endTime;
    }

    public int getProcessId() {
        return processId;
    }

    public void setProcessId(int processId) {
        this.processId = processId;
    }

    public String getAppIds() {
        return appIds;
    }

    public void setAppIds(String appIds) {
        this.appIds = appIds;
    }

    /**
     * package response command
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_RESPONSE);
        byte[] body = JacksonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", status=" + status +
                ", endTime=" + endTime +
                ", processId=" + processId +
                ", appIds='" + appIds + '\'' +
                '}';
    }
}

Review comment:
       wow, what's format of this file! 

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
##########
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public class TaskKillRequestCommand implements Serializable {

    /**
     *  task id
     */
    private int taskInstanceId;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_KILL_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskKillRequestCommand{" +
                "taskInstanceId=" + taskInstanceId +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public class TaskKillRequestCommand implements Serializable {

    /**
     *  task id
     */
    private int taskInstanceId;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_KILL_REQUEST);
        byte[] body = JacksonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskKillRequestCommand{" +
                "taskInstanceId=" + taskInstanceId +
                '}';
    }
}

Review comment:
       wow, what's format of this file!

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
##########
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;
import java.util.List;

/**
 *  kill task response command
 */
public class TaskKillResponseCommand implements Serializable {

    /**
     * taskInstanceId
     */
    private int taskInstanceId;

    /**
     * host
     */
    private String host;

    /**
     * status
     */
    private int status;


    /**
     * processId
     */
    private int processId;

    /**
     * other resource manager appId , for example : YARN etc
     */
    protected List<String> appIds;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getProcessId() {
        return processId;
    }

    public void setProcessId(int processId) {
        this.processId = processId;
    }

    public List<String> getAppIds() {
        return appIds;
    }

    public void setAppIds(List<String> appIds) {
        this.appIds = appIds;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_KILL_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskKillResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", host='" + host + '\'' +
                ", status=" + status +
                ", processId=" + processId +
                ", appIds=" + appIds +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.JacksonSerializer;

import java.io.Serializable;
import java.util.List;

/**
 *  kill task response command
 */
public class TaskKillResponseCommand implements Serializable {

    /**
     * taskInstanceId
     */
    private int taskInstanceId;

    /**
     * host
     */
    private String host;

    /**
     * status
     */
    private int status;


    /**
     * processId
     */
    private int processId;

    /**
     * other resource manager appId , for example : YARN etc
     */
    protected List<String> appIds;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getProcessId() {
        return processId;
    }

    public void setProcessId(int processId) {
        this.processId = processId;
    }

    public List<String> getAppIds() {
        return appIds;
    }

    public void setAppIds(List<String> appIds) {
        this.appIds = appIds;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_KILL_RESPONSE);
        byte[] body = JacksonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskKillResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", host='" + host + '\'' +
                ", status=" + status +
                ", processId=" + processId +
                ", appIds=" + appIds +
                '}';
    }
}

Review comment:
       wow, what's format of this file!

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
##########
@@ -56,7 +56,7 @@ public void setPath(String path) {
     public Command convert2Command(){
         Command command = new Command();
         command.setType(CommandType.GET_LOG_BYTES_REQUEST);
-        byte[] body = FastJsonSerializer.serialize(this);
+        byte[] body = JacksonSerializer.serialize(this);

Review comment:
       It's better to rename JacksonSerialize to JsonSerializer

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
##########
@@ -57,7 +57,7 @@ public void setData(byte[] data) {
     public Command convert2Command(long opaque){
         Command command = new Command(opaque);
         command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
-        byte[] body = FastJsonSerializer.serialize(this);
+        byte[] body = JacksonSerializer.serialize(this);

Review comment:
       It's better to rename JacksonSerialize to JsonSerializer

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
##########
@@ -84,7 +84,7 @@ public void setLimit(int limit) {
     public Command convert2Command(){
         Command command = new Command();
         command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
-        byte[] body = FastJsonSerializer.serialize(this);
+        byte[] body = JacksonSerializer.serialize(this);

Review comment:
       It's better to rename JacksonSerialize to JsonSerializer

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
##########
@@ -57,7 +57,7 @@ public void setMsg(String msg) {
     public Command convert2Command(long opaque){
         Command command = new Command(opaque);
         command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
-        byte[] body = FastJsonSerializer.serialize(this);
+        byte[] body = JacksonSerializer.serialize(this);

Review comment:
       It's better to rename JacksonSerialize to JsonSerializer

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
##########
@@ -56,7 +56,7 @@ public void setPath(String path) {
     public Command convert2Command(){
         Command command = new Command();
         command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
-        byte[] body = FastJsonSerializer.serialize(this);
+        byte[] body = JacksonSerializer.serialize(this);

Review comment:
       It's better to rename JacksonSerialize to JsonSerializer

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
##########
@@ -57,7 +57,7 @@ public void setMsg(String msg) {
     public Command convert2Command(long opaque){
         Command command = new Command(opaque);
         command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
-        byte[] body = FastJsonSerializer.serialize(this);
+        byte[] body = JacksonSerializer.serialize(this);

Review comment:
       It's better to rename JacksonSerialize to JsonSerializer

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JacksonSerializer.java
##########
@@ -31,7 +39,13 @@
         * @return byte array
         */
        public static <T> byte[] serialize(T obj)  {
-               String json = JSON.toJSONString(obj);
+        String json = "";

Review comment:
       In original logic ,if obj can't format to json ,exceptions throw. 
However current method return empty string ,which maybe change the logic of the 
invoker. You'd better make sure about this will not have any impact on invoker.

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JacksonSerializer.java
##########
@@ -42,7 +56,14 @@
         * @return string
         */
        public static <T> String serializeToString(T obj)  {
-               return JSON.toJSONString(obj);
+               String json = "";
+               try {

Review comment:
       In original logic ,if obj can't format to json ,exceptions throw. 
However current method return empty string ,which maybe change the logic of the 
invoker

##########
File path: 
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JacksonSerializer.java
##########
@@ -54,7 +75,15 @@
         * @return deserialize type
         */
        public static <T> T deserialize(byte[] src, Class<T> clazz) {
-               return JSON.parseObject(src, clazz);
+
+        String json = new String(src, StandardCharsets.UTF_8);

Review comment:
       In original logic ,if obj can't format to json ,exceptions throw. 
However current method return empty string ,which maybe change the logic of the 
invoker

##########
File path: 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
##########
@@ -152,7 +153,13 @@ public void run() {
         // global params string
         String globalParamsStr = taskExecutionContext.getGlobalParams();
         if (globalParamsStr != null) {
-            List<Property> globalParamsList = 
JSONObject.parseArray(globalParamsStr, Property.class);
+            List<Property> globalParamsList = new ArrayList<>();
+
+            try {

Review comment:
       not use getMapper directly

##########
File path: 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
##########
@@ -236,64 +222,90 @@ private String buildDataxJsonFile()
 
     /**
      * build datax job config
-     * 
+     *
      * @return collection of datax job config JSONObject
      * @throws SQLException if error throws SQLException
      */
-    private List<JSONObject> buildDataxJobContentJson() throws SQLException {
-        DataxTaskExecutionContext dataxTaskExecutionContext = 
taskExecutionContext.getDataxTaskExecutionContext();
+    private List<ObjectNode> buildDataxJobContentJson() throws SQLException {
 
+        DataxTaskExecutionContext dataxTaskExecutionContext = 
taskExecutionContext.getDataxTaskExecutionContext();
 
         BaseDataSource dataSourceCfg = 
DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getSourcetype()),
                 dataxTaskExecutionContext.getSourceConnectionParams());
 
         BaseDataSource dataTargetCfg = 
DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getTargetType()),
                 dataxTaskExecutionContext.getTargetConnectionParams());
 
-        List<JSONObject> readerConnArr = new ArrayList<>();
-        JSONObject readerConn = new JSONObject();
-        readerConn.put("querySql", new String[] {dataXParameters.getSql()});
-        readerConn.put("jdbcUrl", new String[] {dataSourceCfg.getJdbcUrl()});
+        List<ObjectNode> readerConnArr = new ArrayList<>();
+        ObjectNode readerConn = JSONUtils.createObjectNode();
+
+        ArrayNode sqlArr = readerConn.putArray("querySql");
+        for (String sql : new String[]{dataXParameters.getSql()}) {
+            sqlArr.add(sql);
+        }
+
+        ArrayNode urlArr = readerConn.putArray("jdbcUrl");
+        for (String url : new String[]{dataSourceCfg.getJdbcUrl()}) {
+            urlArr.add(url);
+        }
+
         readerConnArr.add(readerConn);
 
-        JSONObject readerParam = new JSONObject();
+        ObjectNode readerParam = JSONUtils.createObjectNode();
         readerParam.put("username", dataSourceCfg.getUser());
         readerParam.put("password", dataSourceCfg.getPassword());
-        readerParam.put("connection", readerConnArr);
+        readerParam.putArray("connection").addAll(readerConnArr);
 
-        JSONObject reader = new JSONObject();
+
+        ObjectNode reader = JSONUtils.createObjectNode();
         reader.put("name", 
DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
-        reader.put("parameter", readerParam);
+        reader.set("parameter", readerParam);
+
+        List<ObjectNode> writerConnArr = new ArrayList<>();
+        ObjectNode writerConn = JSONUtils.createObjectNode();

Review comment:
       better abstract this logic to one method of JSONUtils which put array to 
ObjectNode

##########
File path: 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
##########
@@ -236,64 +222,90 @@ private String buildDataxJsonFile()
 
     /**
      * build datax job config
-     * 
+     *
      * @return collection of datax job config JSONObject
      * @throws SQLException if error throws SQLException
      */
-    private List<JSONObject> buildDataxJobContentJson() throws SQLException {
-        DataxTaskExecutionContext dataxTaskExecutionContext = 
taskExecutionContext.getDataxTaskExecutionContext();
+    private List<ObjectNode> buildDataxJobContentJson() throws SQLException {
 
+        DataxTaskExecutionContext dataxTaskExecutionContext = 
taskExecutionContext.getDataxTaskExecutionContext();
 
         BaseDataSource dataSourceCfg = 
DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getSourcetype()),
                 dataxTaskExecutionContext.getSourceConnectionParams());
 
         BaseDataSource dataTargetCfg = 
DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getTargetType()),
                 dataxTaskExecutionContext.getTargetConnectionParams());
 
-        List<JSONObject> readerConnArr = new ArrayList<>();
-        JSONObject readerConn = new JSONObject();
-        readerConn.put("querySql", new String[] {dataXParameters.getSql()});
-        readerConn.put("jdbcUrl", new String[] {dataSourceCfg.getJdbcUrl()});
+        List<ObjectNode> readerConnArr = new ArrayList<>();
+        ObjectNode readerConn = JSONUtils.createObjectNode();
+
+        ArrayNode sqlArr = readerConn.putArray("querySql");
+        for (String sql : new String[]{dataXParameters.getSql()}) {
+            sqlArr.add(sql);
+        }
+
+        ArrayNode urlArr = readerConn.putArray("jdbcUrl");
+        for (String url : new String[]{dataSourceCfg.getJdbcUrl()}) {
+            urlArr.add(url);
+        }
+
         readerConnArr.add(readerConn);
 
-        JSONObject readerParam = new JSONObject();
+        ObjectNode readerParam = JSONUtils.createObjectNode();
         readerParam.put("username", dataSourceCfg.getUser());
         readerParam.put("password", dataSourceCfg.getPassword());
-        readerParam.put("connection", readerConnArr);
+        readerParam.putArray("connection").addAll(readerConnArr);
 
-        JSONObject reader = new JSONObject();
+
+        ObjectNode reader = JSONUtils.createObjectNode();
         reader.put("name", 
DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
-        reader.put("parameter", readerParam);
+        reader.set("parameter", readerParam);
+
+        List<ObjectNode> writerConnArr = new ArrayList<>();
+        ObjectNode writerConn = JSONUtils.createObjectNode();
+        ArrayNode tableArr = writerConn.putArray("table");
+        for (String table : new String[]{dataXParameters.getTargetTable()}) {
+            tableArr.add(table);
+        }
 
-        List<JSONObject> writerConnArr = new ArrayList<>();
-        JSONObject writerConn = new JSONObject();
-        writerConn.put("table", new String[] 
{dataXParameters.getTargetTable()});
         writerConn.put("jdbcUrl", dataTargetCfg.getJdbcUrl());
         writerConnArr.add(writerConn);
 
-        JSONObject writerParam = new JSONObject();
+        ObjectNode writerParam = JSONUtils.createObjectNode();
         writerParam.put("username", dataTargetCfg.getUser());
         writerParam.put("password", dataTargetCfg.getPassword());
-        writerParam.put("column",
-            
parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()),
-                    DbType.of(dataxTaskExecutionContext.getTargetType()),
-                    dataSourceCfg, dataXParameters.getSql()));
-        writerParam.put("connection", writerConnArr);
+
+        String[] columns = 
parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()),
+                DbType.of(dataxTaskExecutionContext.getTargetType()),
+                dataSourceCfg, dataXParameters.getSql());
+        ArrayNode columnArr = writerParam.putArray("column");

Review comment:
       why not use addAll




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to