zhongjiajie commented on issue #138: URL: https://github.com/apache/dolphinscheduler-sdk-python/issues/138#issuecomment-1933416242
> I am able to create workflows normally, but according to my analysis, the process of creating a workflow should automatically create or update tasks in the workflow. > > However, currently, the Python SDK creates workflows but does not correctly create tasks (at lease not show in ui, and http api can not get task created in this way, neither). > > This results in workflows with single tasks that have no dependencies being able to run normally, but workflows with task dependencies cannot be properly created. The database cannot read the relationships of these tasks (I guess), leading to a restart loop. > > Last night, I attempted to replace the `workflow.submit` function with my custom `submit()` function that uses the HTTP API. However, the API list in `swagger-ui/index.html?urls.primaryName=v1` still not functioning properly. Ultimately, I reverse-engineered the API through the UI interface and successfully workedaround this issue. Currently my program is running smoothly. > > the replaced submit function is posted here, hoping it's helpful. > > ``` > > import json > > > def get_task_code(project_code, num=1) -> List[int]: > ret = auth_request( > requests.get, > f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition/gen-task-codes?genNum={num}", > ) > return json.loads(ret.content)["data"] > > > def get_project(name: str): > ret = requests.get( > "http://172.32.2.55:12345/dolphinscheduler/projects/list", > headers={ > "token": AUTH_TOKEN, > "Accept": "application/json", > }, > ) > > for i in json.loads(ret.content)["data"]: > if name == i["name"]: > return i > return None > > > def get_workflow(project_code: int, name: str): > ret = requests.get( > f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/list", > headers={ > "token": "25fc3ddf36991fe6f95ce6bdd0d36448", > "Accept": "application/json", > }, > ) > for workflow in json.loads(ret.content)["data"]: > if workflow["processDefinition"]["name"] == name: > return workflow["processDefinition"] > return None > > > def get_tasks(project_code: int): > ret = requests.get( > f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/task-definition?pageNo=1&pageSize=30", > headers={ > "token": "25fc3ddf36991fe6f95ce6bdd0d36448", > "Accept": "application/json", > }, > ) > return json.loads(ret.content)["data"]["totalList"] > > > def get_resource_id(name: str): > ret = auth_request( > requests.get, > "http://172.32.2.55:12345/dolphinscheduler/datasources/list?type=SSH&testFlag=0", > ) > # breakpoint() > > for resource in json.loads(ret.content)["data"]: > if resource["name"] == name: > return resource["id"] > raise KeyError() > > > def release(project_code, workflow_code): > auth_request( > requests.post, > f"http://172.32.2.55:12345/dolphinscheduler/projects/{project_code}/process-definition/{workflow_code}/release", > ) > > > def submit(workflow: Workflow, online=True): > > workflow_def = workflow.get_define() > > project = get_project(workflow_def["project"]) > project_code = project["code"] > > url_workflow = get_workflow(project_code, name=workflow_def["name"]) > # [{"taskCode":12534793670368,"x":160,"y":74},{"taskCode":12535087404769,"x":135.7613525390625,"y":250.74148559570312}] > > tasks = get_tasks(project_code) > > task_codes = set([task["taskCode"] for task in tasks]) > locations = [] > > x_offset = 100 > y_offset = 100 > > task_code_map = {} > for task in workflow_def["taskDefinitionJson"]: # type: dict > task.setdefault("createTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) > task.setdefault("updateTime", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) > task.setdefault("projectName", project["name"]) > task.setdefault("projectCode", project["code"]) > task.setdefault("operator", 1) > task.setdefault("modifyBy", None) > task["delayTime"] = str(task["delayTime"]) > task["description"] = str(task["description"]) > task["failRetryTimes"] = str(task["failRetryInterval"]) > task["failRetryInterval"] = str(task["failRetryInterval"]) > if task.get("environmentCode", None) is None: > task["environmentCode"] = -1 > if task.get("timeoutNotifyStrategy", None) is None: > task["timeoutNotifyStrategy"] = "" > task.setdefault("userName", None) > task.setdefault("userId", 1) > task.setdefault("taskParamList", []) > task.setdefault("taskParamMap", {}) > task.setdefault("taskExecuteType", "BATCH") > > # locations.append( > # { > # "taskCode": task["code"], > # "x": x_offset, > # "y": y_offset, > # } > # ) > x_offset += 30 > y_offset += 30 > > # not needed > # if task["code"] not in task_codes: > # gen_task_code = get_task_code(project_code)[0] > # task_code_map.setdefault(task["code"], gen_task_code) > # task["code"] = gen_task_code > # ret = create_single_task(project_code, url_workflow["code"], task) > > for relation in workflow_def["taskRelationJson"]: > relation["preTaskCode"] = task_code_map.get( > relation["preTaskCode"], relation["preTaskCode"] > ) > relation["postTaskCode"] = task_code_map.get( > relation["postTaskCode"], relation["postTaskCode"] > ) > > data = { > "taskDefinitionJson": json.dumps(workflow_def["taskDefinitionJson"]), > "taskRelationJson": json.dumps(workflow_def["taskRelationJson"]), > "locations": "", > "name": workflow_def["name"], > "executionType": workflow_def["executionType"], > "description": workflow_def["description"], > "globalParams": json.dumps(workflow.param_json), > "timeout": workflow_def["timeout"], > "releaseState": "ONLINE" if online else "OFFLINE", > } > > assert project is not None, workflow_def["project"] > workflow = get_workflow(project["code"], workflow_def["name"]) > > if workflow is None: > # create > ret = auth_request( > requests.post, > f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition", > data=data, > ) > else: > if workflow_def["releaseState"] == 1: > release(project_code, workflow["code"]) > # update > ret = auth_request( > requests.put, > f"http://172.32.2.55:12345/dolphinscheduler/projects/{project['code']}/process-definition/{workflow['code']}", > data=data, > ) > print(json.loads(ret.content)) > ``` could you share what kind of task do you use in pydolphinschsduler? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
