zhongjiajie commented on issue #138:
URL: 
https://github.com/apache/dolphinscheduler-sdk-python/issues/138#issuecomment-1933416242

   > I am able to create workflows normally, but according to my analysis, the 
process of creating a workflow should automatically create or update tasks in 
the workflow.
   > 
   > However, currently, the Python SDK creates workflows but does not 
correctly create tasks (at lease not show in ui, and http api can not get task 
created in this way, neither).
   > 
   > This results in workflows with single tasks that have no dependencies 
being able to run normally, but workflows with task dependencies cannot be 
properly created. The database cannot read the relationships of these tasks (I 
guess), leading to a restart loop.
   > 
   > Last night, I attempted to replace the `workflow.submit` function with my 
custom `submit()` function that uses the HTTP API. However, the API list in 
`swagger-ui/index.html?urls.primaryName=v1` still not functioning properly. 
Ultimately, I reverse-engineered the API through the UI interface and 
successfully workedaround this issue. Currently my program is running smoothly.
   > 
   > the replaced submit function is posted here, hoping it's helpful.
   > 
   > ```
   > 
   > import json
   > 
   > 
   > def get_task_code(project_code, num=1) -> List[int]:
   >     ret = auth_request(
   >         requests.get,
   >         
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition/gen-task-codes?genNum={num}";,
   >     )
   >     return json.loads(ret.content)["data"]
   > 
   > 
   > def get_project(name: str):
   >     ret = requests.get(
   >         "http://172.32.2.55:12345/dolphinscheduler/projects/list";,
   >         headers={
   >             "token": AUTH_TOKEN,
   >             "Accept": "application/json",
   >         },
   >     )
   > 
   >     for i in json.loads(ret.content)["data"]:
   >         if name == i["name"]:
   >             return i
   >     return None
   > 
   > 
   > def get_workflow(project_code: int, name: str):
   >     ret = requests.get(
   >         
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/list";,
   >         headers={
   >             "token": "25fc3ddf36991fe6f95ce6bdd0d36448",
   >             "Accept": "application/json",
   >         },
   >     )
   >     for workflow in json.loads(ret.content)["data"]:
   >         if workflow["processDefinition"]["name"] == name:
   >             return workflow["processDefinition"]
   >     return None
   > 
   > 
   > def get_tasks(project_code: int):
   >     ret = requests.get(
   >         
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition?pageNo=1&pageSize=30";,
   >         headers={
   >             "token": "25fc3ddf36991fe6f95ce6bdd0d36448",
   >             "Accept": "application/json",
   >         },
   >     )
   >     return json.loads(ret.content)["data"]["totalList"]
   > 
   > 
   > def get_resource_id(name: str):
   >     ret = auth_request(
   >         requests.get,
   >         
"http://172.32.2.55:12345/dolphinscheduler/datasources/list?type=SSH&testFlag=0";,
   >     )
   >     # breakpoint()
   > 
   >     for resource in json.loads(ret.content)["data"]:
   >         if resource["name"] == name:
   >             return resource["id"]
   >     raise KeyError()
   > 
   > 
   > def release(project_code, workflow_code):
   >     auth_request(
   >         requests.post,
   >         
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/{workflow_code}/release";,
   >     )
   > 
   > 
   > def submit(workflow: Workflow, online=True):
   > 
   >     workflow_def = workflow.get_define()
   > 
   >     project = get_project(workflow_def["project"])
   >     project_code = project["code"]
   > 
   >     url_workflow = get_workflow(project_code, name=workflow_def["name"])
   >     # 
[{"taskCode":12534793670368,"x":160,"y":74},{"taskCode":12535087404769,"x":135.7613525390625,"y":250.74148559570312}]
   > 
   >     tasks = get_tasks(project_code)
   > 
   >     task_codes = set([task["taskCode"] for task in tasks])
   >     locations = []
   > 
   >     x_offset = 100
   >     y_offset = 100
   > 
   >     task_code_map = {}
   >     for task in workflow_def["taskDefinitionJson"]:  # type: dict
   >         task.setdefault("createTime", datetime.now().strftime("%Y-%m-%d 
%H:%M:%S"))
   >         task.setdefault("updateTime", datetime.now().strftime("%Y-%m-%d 
%H:%M:%S"))
   >         task.setdefault("projectName", project["name"])
   >         task.setdefault("projectCode", project["code"])
   >         task.setdefault("operator", 1)
   >         task.setdefault("modifyBy", None)
   >         task["delayTime"] = str(task["delayTime"])
   >         task["description"] = str(task["description"])
   >         task["failRetryTimes"] = str(task["failRetryInterval"])
   >         task["failRetryInterval"] = str(task["failRetryInterval"])
   >         if task.get("environmentCode", None) is None:
   >             task["environmentCode"] = -1
   >         if task.get("timeoutNotifyStrategy", None) is None:
   >             task["timeoutNotifyStrategy"] = ""
   >         task.setdefault("userName", None)
   >         task.setdefault("userId", 1)
   >         task.setdefault("taskParamList", [])
   >         task.setdefault("taskParamMap", {})
   >         task.setdefault("taskExecuteType", "BATCH")
   > 
   >         # locations.append(
   >         #     {
   >         #         "taskCode": task["code"],
   >         #         "x": x_offset,
   >         #         "y": y_offset,
   >         #     }
   >         # )
   >         x_offset += 30
   >         y_offset += 30
   > 
   >         # not needed
   >         # if task["code"] not in task_codes:
   >         #     gen_task_code = get_task_code(project_code)[0]
   >         #     task_code_map.setdefault(task["code"], gen_task_code)
   >         #     task["code"] = gen_task_code
   >         #     ret = create_single_task(project_code, url_workflow["code"], 
task)
   > 
   >     for relation in workflow_def["taskRelationJson"]:
   >         relation["preTaskCode"] = task_code_map.get(
   >             relation["preTaskCode"], relation["preTaskCode"]
   >         )
   >         relation["postTaskCode"] = task_code_map.get(
   >             relation["postTaskCode"], relation["postTaskCode"]
   >         )
   > 
   >     data = {
   >         "taskDefinitionJson": 
json.dumps(workflow_def["taskDefinitionJson"]),
   >         "taskRelationJson": json.dumps(workflow_def["taskRelationJson"]),
   >         "locations": "",
   >         "name": workflow_def["name"],
   >         "executionType": workflow_def["executionType"],
   >         "description": workflow_def["description"],
   >         "globalParams": json.dumps(workflow.param_json),
   >         "timeout": workflow_def["timeout"],
   >         "releaseState": "ONLINE" if online else "OFFLINE",
   >     }
   > 
   >     assert project is not None, workflow_def["project"]
   >     workflow = get_workflow(project["code"], workflow_def["name"])
   > 
   >     if workflow is None:
   >         # create
   >         ret = auth_request(
   >             requests.post,
   >             
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition",
   >             data=data,
   >         )
   >     else:
   >         if workflow_def["releaseState"] == 1:
   >             release(project_code, workflow["code"])
   >         # update
   >         ret = auth_request(
   >             requests.put,
   >             
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition/{workflow['code']}",
   >             data=data,
   >         )
   >     print(json.loads(ret.content))
   > ```
   
   could you share what kind of task do you use in pydolphinschsduler?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to