sailist commented on issue #138:
URL:
https://github.com/apache/dolphinscheduler-sdk-python/issues/138#issuecomment-1933332694
I am able to create workflows normally, but according to my analysis, the
process of creating a workflow should automatically create or update tasks in
the workflow.
However, currently, the Python SDK creates workflows but does not correctly
create tasks (at lease not show in ui, and http api can not get task created in
this way, neither).
This results in workflows with single tasks that have no dependencies being
able to run normally, but workflows with task dependencies cannot be properly
created. The database cannot read the relationships of these tasks (I guess),
leading to a restart loop.
Last night, I attempted to replace the `workflow.submit` function with my
custom `submit()` function that uses the HTTP API. However, the API list in
`swagger-ui/index.html?urls.primaryName=v1` still not functioning properly.
Ultimately, I reverse-engineered the API through the UI interface and
successfully workedaround this issue. Currently my program is running smoothly.
the replaced submit function is posted here, hoping it's helpful.
```
import json
def get_task_code(project_code, num=1) -> List[int]:
ret = auth_request(
requests.get,
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition/gen-task-codes?genNum={num}",
)
return json.loads(ret.content)["data"]
def get_project(name: str):
ret = requests.get(
"http://172.32.2.55:12345/dolphinscheduler/projects/list",
headers={
"token": AUTH_TOKEN,
"Accept": "application/json",
},
)
for i in json.loads(ret.content)["data"]:
if name == i["name"]:
return i
return None
def get_workflow(project_code: int, name: str):
ret = requests.get(
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/list",
headers={
"token": "25fc3ddf36991fe6f95ce6bdd0d36448",
"Accept": "application/json",
},
)
for workflow in json.loads(ret.content)["data"]:
if workflow["processDefinition"]["name"] == name:
return workflow["processDefinition"]
return None
def get_tasks(project_code: int):
ret = requests.get(
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition?pageNo=1&pageSize=30",
headers={
"token": "25fc3ddf36991fe6f95ce6bdd0d36448",
"Accept": "application/json",
},
)
return json.loads(ret.content)["data"]["totalList"]
def get_resource_id(name: str):
ret = auth_request(
requests.get,
"http://172.32.2.55:12345/dolphinscheduler/datasources/list?type=SSH&testFlag=0",
)
# breakpoint()
for resource in json.loads(ret.content)["data"]:
if resource["name"] == name:
return resource["id"]
raise KeyError()
def release(project_code, workflow_code):
auth_request(
requests.post,
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/{workflow_code}/release",
)
def submit(workflow: Workflow, online=True):
workflow_def = workflow.get_define()
project = get_project(workflow_def["project"])
project_code = project["code"]
url_workflow = get_workflow(project_code, name=workflow_def["name"])
#
[{"taskCode":12534793670368,"x":160,"y":74},{"taskCode":12535087404769,"x":135.7613525390625,"y":250.74148559570312}]
tasks = get_tasks(project_code)
task_codes = set([task["taskCode"] for task in tasks])
locations = []
x_offset = 100
y_offset = 100
task_code_map = {}
for task in workflow_def["taskDefinitionJson"]: # type: dict
task.setdefault("createTime", datetime.now().strftime("%Y-%m-%d
%H:%M:%S"))
task.setdefault("updateTime", datetime.now().strftime("%Y-%m-%d
%H:%M:%S"))
task.setdefault("projectName", project["name"])
task.setdefault("projectCode", project["code"])
task.setdefault("operator", 1)
task.setdefault("modifyBy", None)
task["delayTime"] = str(task["delayTime"])
task["description"] = str(task["description"])
task["failRetryTimes"] = str(task["failRetryInterval"])
task["failRetryInterval"] = str(task["failRetryInterval"])
if task.get("environmentCode", None) is None:
task["environmentCode"] = -1
if task.get("timeoutNotifyStrategy", None) is None:
task["timeoutNotifyStrategy"] = ""
task.setdefault("userName", None)
task.setdefault("userId", 1)
task.setdefault("taskParamList", [])
task.setdefault("taskParamMap", {})
task.setdefault("taskExecuteType", "BATCH")
# locations.append(
# {
# "taskCode": task["code"],
# "x": x_offset,
# "y": y_offset,
# }
# )
x_offset += 30
y_offset += 30
# not needed
# if task["code"] not in task_codes:
# gen_task_code = get_task_code(project_code)[0]
# task_code_map.setdefault(task["code"], gen_task_code)
# task["code"] = gen_task_code
# ret = create_single_task(project_code, url_workflow["code"],
task)
for relation in workflow_def["taskRelationJson"]:
relation["preTaskCode"] = task_code_map.get(
relation["preTaskCode"], relation["preTaskCode"]
)
relation["postTaskCode"] = task_code_map.get(
relation["postTaskCode"], relation["postTaskCode"]
)
data = {
"taskDefinitionJson": json.dumps(workflow_def["taskDefinitionJson"]),
"taskRelationJson": json.dumps(workflow_def["taskRelationJson"]),
"locations": "",
"name": workflow_def["name"],
"executionType": workflow_def["executionType"],
"description": workflow_def["description"],
"globalParams": json.dumps(workflow.param_json),
"timeout": workflow_def["timeout"],
"releaseState": "ONLINE" if online else "OFFLINE",
}
assert project is not None, workflow_def["project"]
workflow = get_workflow(project["code"], workflow_def["name"])
if workflow is None:
# create
ret = auth_request(
requests.post,
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition",
data=data,
)
else:
if workflow_def["releaseState"] == 1:
release(project_code, workflow["code"])
# update
ret = auth_request(
requests.put,
f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition/{workflow['code']}",
data=data,
)
print(json.loads(ret.content))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]