turbaszek commented on a change in pull request #14521: URL: https://github.com/apache/airflow/pull/14521#discussion_r619614212
########## File path: airflow/providers/asana/hooks/asana.py ########## @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Connect to Asana.""" + +from typing import Any, Dict + +from asana import Client +from cached_property import cached_property + +from airflow.hooks.base import BaseHook + + +class AsanaHook(BaseHook): + """Wrapper around Asana Python client library.""" + + conn_name_attr = "asana_conn_id" + default_conn_name = "asana_default" + conn_type = "asana" + hook_name = "Asana" + + def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.connection = self.get_connection(conn_id) + extras = self.connection.extra_dejson + self.workspace = extras.get("extra__asana__workspace") or None + self.project = extras.get("extra__asana__project") or None + + def get_conn(self) -> Client: + return self.client + + @staticmethod + def get_connection_form_widgets() -> Dict[str, Any]: + """Returns connection widgets to add to connection form""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "extra__asana__workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()), + "extra__asana__project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()), + } + + @staticmethod + def get_ui_field_behaviour() -> Dict: + """Returns custom field behaviour""" + return { + "hidden_fields": ["port", "host", "login", "schema"], + "relabeling": {}, + "placeholders": { + "password": "Asana personal access token", + "extra__asana__workspace": "Asana workspace gid", + "extra__asana__project": "Asana project gid", + }, + } + + @cached_property + def client(self) -> Client: + """Instantiates python-asana Client""" + if not self.connection.password: + raise ValueError( + "Asana connection password must contain a personal access token: " + "https://developers.asana.com/docs/personal-access-token" + ) + + return Client.access_token(self.connection.password) + + def create_task(self, task_name: str, params: dict) -> dict: + """ + Creates an Asana task. For a complete list of possible parameters, see + https://developers.asana.com/docs/create-a-task Review comment: Could you please describe the arguments ``` :param task_name: lorem ipsu... ``` ########## File path: airflow/providers/asana/hooks/asana.py ########## @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Connect to Asana.""" + +from typing import Any, Dict + +from asana import Client +from cached_property import cached_property + +from airflow.hooks.base import BaseHook + + +class AsanaHook(BaseHook): + """Wrapper around Asana Python client library.""" + + conn_name_attr = "asana_conn_id" + default_conn_name = "asana_default" + conn_type = "asana" + hook_name = "Asana" + + def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.connection = self.get_connection(conn_id) + extras = self.connection.extra_dejson + self.workspace = extras.get("extra__asana__workspace") or None + self.project = extras.get("extra__asana__project") or None + + def get_conn(self) -> Client: + return self.client + + @staticmethod + def get_connection_form_widgets() -> Dict[str, Any]: + """Returns connection widgets to add to connection form""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "extra__asana__workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()), + "extra__asana__project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()), + } + + @staticmethod + def get_ui_field_behaviour() -> Dict: + """Returns custom field behaviour""" + return { + "hidden_fields": ["port", "host", "login", "schema"], + "relabeling": {}, + "placeholders": { + "password": "Asana personal access token", + "extra__asana__workspace": "Asana workspace gid", + "extra__asana__project": "Asana project gid", + }, + } + + @cached_property + def client(self) -> Client: + """Instantiates python-asana Client""" + if not self.connection.password: + raise ValueError( + "Asana connection password must contain a personal access token: " + "https://developers.asana.com/docs/personal-access-token" + ) + + return Client.access_token(self.connection.password) + + def create_task(self, task_name: str, params: dict) -> dict: + """ + Creates an Asana task. For a complete list of possible parameters, see + https://developers.asana.com/docs/create-a-task + """ + merged_params = self._merge_create_task_parameters(task_name, params) + self._validate_create_task_parameters(merged_params) + response = self.client.tasks.create(params=merged_params) # pylint: disable=no-member + return response + + def _merge_create_task_parameters(self, task_name: str, task_params: dict) -> dict: + """Merge create_task parameters with default params from the connection.""" + merged_params = {"name": task_name} + if self.project: + merged_params["projects"] = [self.project] + elif self.workspace and not (task_params and ("projects" in task_params)): + merged_params["workspace"] = self.workspace + if task_params: + merged_params.update(task_params) + return merged_params + + @staticmethod + def _validate_create_task_parameters(params: dict) -> None: + """Check that user provided minimal create parameters.""" + required_parameters = {"workspace", "projects", "parent"} + if required_parameters.isdisjoint(params): + raise ValueError( + f"You must specify at least one of {required_parameters} in the create_task parameters" + ) + + def delete_task(self, task_id: str) -> dict: + """Deletes an Asana task.""" + response = self.client.tasks.delete_task(task_id) # pylint: disable=no-member + return response + + def find_task(self, params: dict) -> list: + """ + Retrieves a list of Asana tasks that match search parameters. For a complete list of parameters, + see https://bit.ly/3uIqMj0 Review comment: Please do not use final link instead of tracking one 😉 ########## File path: airflow/providers/asana/hooks/asana.py ########## @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Connect to Asana.""" + +from typing import Any, Dict + +from asana import Client +from cached_property import cached_property + +from airflow.hooks.base import BaseHook + + +class AsanaHook(BaseHook): + """Wrapper around Asana Python client library.""" + + conn_name_attr = "asana_conn_id" + default_conn_name = "asana_default" + conn_type = "asana" + hook_name = "Asana" + + def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.connection = self.get_connection(conn_id) + extras = self.connection.extra_dejson + self.workspace = extras.get("extra__asana__workspace") or None + self.project = extras.get("extra__asana__project") or None + + def get_conn(self) -> Client: + return self.client + + @staticmethod + def get_connection_form_widgets() -> Dict[str, Any]: + """Returns connection widgets to add to connection form""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "extra__asana__workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()), + "extra__asana__project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()), + } + + @staticmethod + def get_ui_field_behaviour() -> Dict: + """Returns custom field behaviour""" + return { + "hidden_fields": ["port", "host", "login", "schema"], + "relabeling": {}, + "placeholders": { + "password": "Asana personal access token", + "extra__asana__workspace": "Asana workspace gid", + "extra__asana__project": "Asana project gid", + }, + } + + @cached_property + def client(self) -> Client: + """Instantiates python-asana Client""" + if not self.connection.password: + raise ValueError( + "Asana connection password must contain a personal access token: " + "https://developers.asana.com/docs/personal-access-token" + ) + + return Client.access_token(self.connection.password) + + def create_task(self, task_name: str, params: dict) -> dict: + """ + Creates an Asana task. For a complete list of possible parameters, see + https://developers.asana.com/docs/create-a-task + """ + merged_params = self._merge_create_task_parameters(task_name, params) + self._validate_create_task_parameters(merged_params) + response = self.client.tasks.create(params=merged_params) # pylint: disable=no-member + return response + + def _merge_create_task_parameters(self, task_name: str, task_params: dict) -> dict: + """Merge create_task parameters with default params from the connection.""" + merged_params = {"name": task_name} + if self.project: + merged_params["projects"] = [self.project] + elif self.workspace and not (task_params and ("projects" in task_params)): + merged_params["workspace"] = self.workspace + if task_params: + merged_params.update(task_params) + return merged_params + + @staticmethod + def _validate_create_task_parameters(params: dict) -> None: + """Check that user provided minimal create parameters.""" + required_parameters = {"workspace", "projects", "parent"} + if required_parameters.isdisjoint(params): + raise ValueError( + f"You must specify at least one of {required_parameters} in the create_task parameters" + ) + + def delete_task(self, task_id: str) -> dict: + """Deletes an Asana task.""" + response = self.client.tasks.delete_task(task_id) # pylint: disable=no-member + return response + + def find_task(self, params: dict) -> list: + """ + Retrieves a list of Asana tasks that match search parameters. For a complete list of parameters, + see https://bit.ly/3uIqMj0 Review comment: Please do use final link instead of tracking one 😉 ########## File path: airflow/providers/asana/hooks/asana.py ########## @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Connect to Asana.""" + +from typing import Any, Dict + +from asana import Client +from cached_property import cached_property + +from airflow.hooks.base import BaseHook + + +class AsanaHook(BaseHook): + """Wrapper around Asana Python client library.""" + + conn_name_attr = "asana_conn_id" + default_conn_name = "asana_default" + conn_type = "asana" + hook_name = "Asana" + + def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.connection = self.get_connection(conn_id) + extras = self.connection.extra_dejson + self.workspace = extras.get("extra__asana__workspace") or None + self.project = extras.get("extra__asana__project") or None + + def get_conn(self) -> Client: + return self.client + + @staticmethod + def get_connection_form_widgets() -> Dict[str, Any]: + """Returns connection widgets to add to connection form""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "extra__asana__workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()), + "extra__asana__project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()), + } + + @staticmethod + def get_ui_field_behaviour() -> Dict: + """Returns custom field behaviour""" + return { + "hidden_fields": ["port", "host", "login", "schema"], + "relabeling": {}, + "placeholders": { + "password": "Asana personal access token", + "extra__asana__workspace": "Asana workspace gid", + "extra__asana__project": "Asana project gid", + }, + } + + @cached_property + def client(self) -> Client: + """Instantiates python-asana Client""" + if not self.connection.password: + raise ValueError( + "Asana connection password must contain a personal access token: " + "https://developers.asana.com/docs/personal-access-token" + ) + + return Client.access_token(self.connection.password) + + def create_task(self, task_name: str, params: dict) -> dict: + """ + Creates an Asana task. For a complete list of possible parameters, see + https://developers.asana.com/docs/create-a-task + """ + merged_params = self._merge_create_task_parameters(task_name, params) + self._validate_create_task_parameters(merged_params) + response = self.client.tasks.create(params=merged_params) # pylint: disable=no-member + return response + + def _merge_create_task_parameters(self, task_name: str, task_params: dict) -> dict: + """Merge create_task parameters with default params from the connection.""" + merged_params = {"name": task_name} + if self.project: + merged_params["projects"] = [self.project] + elif self.workspace and not (task_params and ("projects" in task_params)): + merged_params["workspace"] = self.workspace + if task_params: + merged_params.update(task_params) + return merged_params + + @staticmethod + def _validate_create_task_parameters(params: dict) -> None: + """Check that user provided minimal create parameters.""" + required_parameters = {"workspace", "projects", "parent"} + if required_parameters.isdisjoint(params): + raise ValueError( + f"You must specify at least one of {required_parameters} in the create_task parameters" + ) + + def delete_task(self, task_id: str) -> dict: + """Deletes an Asana task.""" + response = self.client.tasks.delete_task(task_id) # pylint: disable=no-member + return response + + def find_task(self, params: dict) -> list: + """ + Retrieves a list of Asana tasks that match search parameters. For a complete list of parameters, + see https://bit.ly/3uIqMj0 + """ + merged_params = self._merge_find_task_parameters(params) + self._validate_find_task_parameters(merged_params) + response = self.client.tasks.find_all(params=merged_params) # pylint: disable=no-member + return list(response) + + def _merge_find_task_parameters(self, search_parameters: dict) -> dict: + """Merge find_task parameters with default params from the connection.""" + merged_params = {} + if self.project: + merged_params["project"] = self.project + # Only use default workspace if user did not provide a project id + elif self.workspace and not (search_parameters and ("project" in search_parameters)): + merged_params["workspace"] = self.workspace + if search_parameters: + merged_params.update(search_parameters) + return merged_params + + @staticmethod + def _validate_find_task_parameters(params: dict) -> None: + """Check that user provided minimal search parameters.""" + one_of_list = {"project", "section", "tag", "user_task_list"} + both_of_list = {"assignee", "workspace"} + contains_both = both_of_list.issubset(params) + contains_one = not one_of_list.isdisjoint(params) + if not (contains_both or contains_one): + raise ValueError( + f"You must specify at least one of {one_of_list} " + f"or both of {both_of_list} in the find_task parameters." + ) + + def update_task(self, task_id: str, params: dict) -> dict: + """ + Updates an existing Asana task. For a complete list of possible parameters, see + https://developers.asana.com/docs/update-a-task + """ + response = self.client.tasks.update(task_id, params) # pylint: disable=no-member + return response + + def create_project(self, params: dict) -> dict: Review comment: Please describe the `params` argument in docstring in all methods. The name is vague and having docstring will help users understand what should there be passed ########## File path: airflow/providers/asana/operators/asana_tasks.py ########## @@ -0,0 +1,175 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Dict, Optional + +from airflow.models import BaseOperator +from airflow.providers.asana.hooks.asana import AsanaHook +from airflow.utils.decorators import apply_defaults + + +class AsanaCreateTaskOperator(BaseOperator): + """ + This operator can be used to create Asana tasks. For more information on + Asana optional task parameters, see https://developers.asana.com/docs/create-a-task + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AsanaCreateTaskOperator` + + :param conn_id: The Asana connection to use. + :type conn_id: str + :param name: Name of the Asana task. + :type name: str + :param task_parameters: Any of the optional task creation parameters. + See https://developers.asana.com/docs/create-a-task for a complete list. + You must specify at least one of 'workspace', 'parent', or 'projects' + either here or in the connection. + :type task_parameters: dict + """ + + @apply_defaults + def __init__( + self, + *, + conn_id: str, + name: str, + task_parameters: Optional[dict] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.conn_id = conn_id + self.name = name + self.task_parameters = task_parameters + + def execute(self, context: Dict) -> str: + hook = AsanaHook(conn_id=self.conn_id) + response = hook.create_task(self.name, self.task_parameters) + self.log.info(response) + return response["gid"] + + +class AsanaUpdateTaskOperator(BaseOperator): + """ + This operator can be used to update Asana tasks. + For more information on Asana optional task parameters, see + https://developers.asana.com/docs/update-a-task + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AsanaUpdateTaskOperator` + + :param conn_id: The Asana connection to use. + :type conn_id: str + :param asana_task_gid: Asana task ID to update + :type asana_task_gid: str + :param task_parameters: Any task parameters that should be updated. + See https://developers.asana.com/docs/update-a-task for a complete list. + :type task_update_parameters: dict + """ + + @apply_defaults + def __init__( + self, + *, + conn_id: str, + asana_task_gid: str, + task_parameters: dict, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.conn_id = conn_id + self.asana_task_gid = asana_task_gid + self.task_parameters = task_parameters + + def execute(self, context: Dict) -> None: + hook = AsanaHook(conn_id=self.conn_id) + response = hook.update_task(self.asana_task_gid, self.task_parameters) + self.log.info(response) + + +class AsanaDeleteTaskOperator(BaseOperator): + """ + This operator can be used to delete Asana tasks. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AsanaDeleteTaskOperator` + + :param conn_id: The Asana connection to use. + :type conn_id: str + :param asana_task_gid: Asana Task ID to delete. + :type asana_task_gid: str + """ + + @apply_defaults + def __init__( + self, + *, + conn_id: str, + asana_task_gid: str, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.conn_id = conn_id + self.asana_task_gid = asana_task_gid + + def execute(self, context: Dict) -> None: + hook = AsanaHook(conn_id=self.conn_id) + response = hook.delete_task(self.asana_task_gid) Review comment: What will happen if the task does not exist? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
