zhongjiajie commented on a change in pull request #6269:
URL: https://github.com/apache/dolphinscheduler/pull/6269#discussion_r715655903



##########
File path: 
dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.*;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.context.annotation.ComponentScan;
+import py4j.GatewayServer;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+
+@ComponentScan(value = "org.apache.dolphinscheduler")
+public class PythonGatewayServer {
+    // TODO could this work in py4j
+//    @Autowired
+//    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private TenantService tenantService;
+
+    @Autowired
+    private ExecutorService executorService;
+
+    @Autowired
+    private ProcessDefinitionService processDefinitionService;
+
+    @Autowired
+    private TaskDefinitionService taskDefinitionService;
+
+
+    private final User userPythonGateway = new User() {
+        {
+            setId(Integer.MAX_VALUE);
+            setUserName("userPythonGateway");
+            setUserType(UserType.ADMIN_USER);
+        }
+    };
+
+    private final Queue queuePythonGateway = new Queue() {
+        {
+            setId(Integer.MAX_VALUE);
+            setQueueName("queuePythonGateway");
+        }
+    };
+
+    public String ping() {
+        return "PONG";
+    }
+
+    // TODO Should we import package in python client side? utils package can 
but service can not, why
+    // Core api
+    public Map<String, Object> genTaskCodeList(Integer genNum) {
+        return taskDefinitionService.genTaskCodeList(userPythonGateway, 
genNum);
+    }
+
+    /**
+     * create or update process definition.
+     * If process definition do not exists in Project=`projectCode` would 
create a new one
+     * If process definition already exists in Project=`projectCode` would 
update it
+     * All requests
+     * <p>
+     * //     * @param loginUser          login user
+     *
+     * @param projectCode        project code
+     * @param name               process definition name
+     * @param description        description
+     * @param globalParams       global params
+     * @param locations          locations for nodes
+     * @param timeout            timeout
+     * @param tenantCode         tenantCode
+     * @param taskRelationJson   relation json for nodes
+     * @param taskDefinitionJson taskDefinitionJson
+     * @return create result code
+     */
+    public Long createOrUpdateProcessDefinition(String projectName,
+                                                String name,
+                                                String description,
+                                                String globalParams,
+                                                String locations,
+                                                int timeout,
+                                                String tenantCode,
+                                                String taskRelationJson,
+                                                String taskDefinitionJson) {
+        Project project = (Project) 
projectService.queryByName(userPythonGateway, name).get(Constants.DATA_LIST);
+        long projectCode = project.getCode();
+        Map<String, Object> verifyProcessDefinitionName = 
processDefinitionService.verifyProcessDefinitionName(userPythonGateway, 
projectCode, name);
+
+        if (verifyProcessDefinitionName.get(Constants.STATUS) == 
Status.SUCCESS) {
+            // update process definition
+//            ProcessDefinition processDefinition = 
processDefinitionMapper.queryByDefineName(projectCode, name);
+//            long code = processDefinition.getCode();
+//            Map<String, Object> result = 
processDefinitionService.updateProcessDefinition(userPythonGateway, 
projectCode, name, code, description, globalParams,
+//                    locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson);
+//            return code;
+            return 1L;
+        } else {
+            // create process definition
+            Map<String, Object> result = 
processDefinitionService.createProcessDefinition(userPythonGateway, 
projectCode, name, description, globalParams,
+                    locations, timeout, tenantCode, taskRelationJson, 
taskDefinitionJson);
+            ProcessDefinition processDefinition = (ProcessDefinition) 
result.get(Constants.DATA_LIST);
+            return processDefinition.getCode();
+        }
+    }
+
+    public void execProcessInstance(long projectCode,
+                                    long processDefinitionCode,
+                                    String cronTime,
+                                    CommandType commandType,
+                                    FailureStrategy failureStrategy,
+                                    String startNodeList,
+                                    TaskDependType taskDependType,
+                                    WarningType warningType,
+                                    int warningGroupId,
+                                    RunMode runMode,
+                                    Priority processInstancePriority,
+                                    String workerGroup,
+                                    Long environmentCode,
+                                    Integer timeout,
+                                    Map<String, String> startParams,
+                                    Integer expectedParallelismNumber) {
+        executorService.execProcessInstance(userPythonGateway,
+                projectCode,
+                processDefinitionCode,
+                cronTime,
+                commandType,
+                failureStrategy,
+                startNodeList,
+                taskDependType,
+                warningType,
+                warningGroupId,
+                runMode,
+                processInstancePriority,
+                workerGroup,
+                environmentCode,
+                timeout, startParams,
+                expectedParallelismNumber);
+    }
+
+
+    // side object
+    public Map<String, Object> createProject(String name, String desc) {
+        return projectService.createProject(userPythonGateway, name, desc);
+    }
+
+    public Map<String, Object> createTenant(String tenantCode, String desc) 
throws Exception {
+        return tenantService.createTenant(userPythonGateway, tenantCode, 
queuePythonGateway.getId(), desc);
+    }
+
+    @PostConstruct
+    public void run() {
+        PythonGatewayServer app = new PythonGatewayServer();
+        // app is now the gateway.entry_point
+        // GatewayServer server = new GatewayServer(app,25334); to specific 
port
+        GatewayServer server = new GatewayServer(app);

Review comment:
       ```suggestion
           GatewayServer server = new GatewayServer(this);
   ```
   
   Hi @caishunfeng, next time when you try to add suggestions in PR, maybe you 
could directly [reviewing changes][1] like this comment. I make code reviewed 
more directly. what more, when PR author  accept your review,  you would become 
[co-author][2] after PR merge
   
   [1]: 
https://docs.github.com/en/github/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/reviewing-proposed-changes-in-a-pull-request
   [2]: 
https://docs.github.com/en/github/committing-changes-to-your-project/creating-and-editing-commits/creating-a-commit-with-multiple-authors




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to