sailist commented on issue #138:
URL: 
https://github.com/apache/dolphinscheduler-sdk-python/issues/138#issuecomment-1933332694

   I am able to create workflows normally, but according to my analysis, the 
process of creating a workflow should automatically create or update tasks in 
the workflow. 
   
   However, currently, the Python SDK creates workflows but does not correctly 
create tasks (at lease not show in ui, and http api can not get task created in 
this way, neither). 
   
   This results in workflows with single tasks that have no dependencies being 
able to run normally, but workflows with task dependencies cannot be properly 
created. The database cannot read the relationships of these tasks (I guess), 
leading to a restart loop.
   
   Last night, I attempted to replace the `workflow.submit` function with my 
custom `submit()` function that uses the HTTP API. However, the API list in 
`swagger-ui/index.html?urls.primaryName=v1` still not functioning properly. 
Ultimately, I reverse-engineered the API  through the UI interface and 
successfully workedaround this issue. Currently my program is running smoothly.
   
   the replaced submit function is posted here, hoping it's helpful.
   
   ```
   
   import json
   
   
   def get_task_code(project_code, num=1) -> List[int]:
       ret = auth_request(
           requests.get,
           
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition/gen-task-codes?genNum={num}";,
       )
       return json.loads(ret.content)["data"]
   
   
   def get_project(name: str):
       ret = requests.get(
           "http://172.32.2.55:12345/dolphinscheduler/projects/list";,
           headers={
               "token": AUTH_TOKEN,
               "Accept": "application/json",
           },
       )
   
       for i in json.loads(ret.content)["data"]:
           if name == i["name"]:
               return i
       return None
   
   
   def get_workflow(project_code: int, name: str):
       ret = requests.get(
           
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/list";,
           headers={
               "token": "25fc3ddf36991fe6f95ce6bdd0d36448",
               "Accept": "application/json",
           },
       )
       for workflow in json.loads(ret.content)["data"]:
           if workflow["processDefinition"]["name"] == name:
               return workflow["processDefinition"]
       return None
   
   
   def get_tasks(project_code: int):
       ret = requests.get(
           
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition?pageNo=1&pageSize=30";,
           headers={
               "token": "25fc3ddf36991fe6f95ce6bdd0d36448",
               "Accept": "application/json",
           },
       )
       return json.loads(ret.content)["data"]["totalList"]
   
   
   def get_resource_id(name: str):
       ret = auth_request(
           requests.get,
           
"http://172.32.2.55:12345/dolphinscheduler/datasources/list?type=SSH&testFlag=0";,
       )
       # breakpoint()
   
       for resource in json.loads(ret.content)["data"]:
           if resource["name"] == name:
               return resource["id"]
       raise KeyError()
   
   
   def release(project_code, workflow_code):
       auth_request(
           requests.post,
           
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/{workflow_code}/release";,
       )
   
   
   def submit(workflow: Workflow, online=True):
   
       workflow_def = workflow.get_define()
   
       project = get_project(workflow_def["project"])
       project_code = project["code"]
   
       url_workflow = get_workflow(project_code, name=workflow_def["name"])
       # 
[{"taskCode":12534793670368,"x":160,"y":74},{"taskCode":12535087404769,"x":135.7613525390625,"y":250.74148559570312}]
   
       tasks = get_tasks(project_code)
   
       task_codes = set([task["taskCode"] for task in tasks])
       locations = []
   
       x_offset = 100
       y_offset = 100
   
       task_code_map = {}
       for task in workflow_def["taskDefinitionJson"]:  # type: dict
           task.setdefault("createTime", datetime.now().strftime("%Y-%m-%d 
%H:%M:%S"))
           task.setdefault("updateTime", datetime.now().strftime("%Y-%m-%d 
%H:%M:%S"))
           task.setdefault("projectName", project["name"])
           task.setdefault("projectCode", project["code"])
           task.setdefault("operator", 1)
           task.setdefault("modifyBy", None)
           task["delayTime"] = str(task["delayTime"])
           task["description"] = str(task["description"])
           task["failRetryTimes"] = str(task["failRetryInterval"])
           task["failRetryInterval"] = str(task["failRetryInterval"])
           if task.get("environmentCode", None) is None:
               task["environmentCode"] = -1
           if task.get("timeoutNotifyStrategy", None) is None:
               task["timeoutNotifyStrategy"] = ""
           task.setdefault("userName", None)
           task.setdefault("userId", 1)
           task.setdefault("taskParamList", [])
           task.setdefault("taskParamMap", {})
           task.setdefault("taskExecuteType", "BATCH")
   
           # locations.append(
           #     {
           #         "taskCode": task["code"],
           #         "x": x_offset,
           #         "y": y_offset,
           #     }
           # )
           x_offset += 30
           y_offset += 30
   
           # not needed
           # if task["code"] not in task_codes:
           #     gen_task_code = get_task_code(project_code)[0]
           #     task_code_map.setdefault(task["code"], gen_task_code)
           #     task["code"] = gen_task_code
           #     ret = create_single_task(project_code, url_workflow["code"], 
task)
   
       for relation in workflow_def["taskRelationJson"]:
           relation["preTaskCode"] = task_code_map.get(
               relation["preTaskCode"], relation["preTaskCode"]
           )
           relation["postTaskCode"] = task_code_map.get(
               relation["postTaskCode"], relation["postTaskCode"]
           )
   
       data = {
           "taskDefinitionJson": json.dumps(workflow_def["taskDefinitionJson"]),
           "taskRelationJson": json.dumps(workflow_def["taskRelationJson"]),
           "locations": "",
           "name": workflow_def["name"],
           "executionType": workflow_def["executionType"],
           "description": workflow_def["description"],
           "globalParams": json.dumps(workflow.param_json),
           "timeout": workflow_def["timeout"],
           "releaseState": "ONLINE" if online else "OFFLINE",
       }
   
       assert project is not None, workflow_def["project"]
       workflow = get_workflow(project["code"], workflow_def["name"])
   
       if workflow is None:
           # create
           ret = auth_request(
               requests.post,
               
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition",
               data=data,
           )
       else:
           if workflow_def["releaseState"] == 1:
               release(project_code, workflow["code"])
           # update
           ret = auth_request(
               requests.put,
               
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition/{workflow['code']}",
               data=data,
           )
       print(json.loads(ret.content))
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to