vincbeck commented on code in PR #21863: URL: https://github.com/apache/airflow/pull/21863#discussion_r867176559
########## airflow/providers/amazon/aws/example_dags/example_quicksight.py: ########## @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.amazon.aws.operators.quicksight import QuickSightCreateIngestionOperator +from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor + +DATA_SET_ID = os.getenv("DATA_SET_ID", "DemoDataSet_Test") +INGESTION_WAITING_ID = os.getenv("INGESTION_WAITING_ID", "DemoDataSet_Ingestion_Waiting_Test") +INGESTION_NO_WAITING_ID = os.getenv("INGESTION_NO_WAITING_ID", "DemoDataSet_Ingestion_No_Waiting_Test") + +with DAG( + dag_id="example_quicksight", + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=["example"], + catchup=False, +) as dag: + # Create and Start the QuickSight SPICE data ingestion + # and waits for its completion. + # [START howto_operator_quicksight] + quicksight_create_ingestion = QuickSightCreateIngestionOperator( Review Comment: Do we want to have twice the same example? One is waiting and one is non waiting? In my opinion, one should be enough, the non waiting one and then the sensor which waits for completion. That's what we usually do. See Glue as an example: https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/example_dags/example_glue.py#L90 ########## airflow/providers/amazon/aws/sensors/quicksight.py: ########## @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, Optional + +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook +from airflow.providers.amazon.aws.hooks.sts import StsHook +from airflow.sensors.base import BaseSensorOperator + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class QuickSightSensor(BaseSensorOperator): + """ + Watches for the status of an Amazon QuickSight Ingestion. + + :param data_set_id: ID of the dataset used in the ingestion. + :param ingestion_id: ID for the ingestion. + :param aws_conn_id: The Airflow connection used for AWS credentials. (templated) + If this is None or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then the default boto3 configuration would be used (and must be + maintained on each worker node). + """ + + def __init__( + self, + *, + data_set_id: str, + ingestion_id: str, + aws_conn_id: str = "aws_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.data_set_id = data_set_id + self.ingestion_id = ingestion_id + self.aws_conn_id = aws_conn_id + self.success_status = "COMPLETED" + self.errored_statuses = ("FAILED", "CANCELLED") + self.quicksight_hook: Optional[QuickSightHook] = None + self.sts_hook: Optional[StsHook] = None + + def poke(self, context: "Context"): + """ + Pokes until the QuickSight Ingestion has successfully finished. + + :param context: The task context during execution. + :return: True if it COMPLETED and False if not. + :rtype: bool + """ + quicksight_hook, sts_hook = self.get_hook() + self.log.info("Poking for Amazon QuickSight Ingestion ID: %s", self.ingestion_id) + aws_account_id = sts_hook.get_account_number() + quicksight_ingestion_state = quicksight_hook.get_status( + aws_account_id, self.data_set_id, self.ingestion_id + ) + self.log.info("QuickSight Status: %s", quicksight_ingestion_state) + if quicksight_ingestion_state in self.errored_statuses: + raise AirflowException("The QuickSight Ingestion failed!") + return quicksight_ingestion_state == self.success_status + + def get_hook(self): + """Returns a new or pre-existing QuickSightHook""" + if self.quicksight_hook and self.sts_hook: + return [self.quicksight_hook, self.sts_hook] + + self.quicksight_hook = QuickSightHook(aws_conn_id=self.aws_conn_id) + self.sts_hook = StsHook(aws_conn_id=self.aws_conn_id) + return [self.quicksight_hook, self.sts_hook] Review Comment: ```suggestion @cached_property def get_quicksight_hook(self): return QuickSightHook(aws_conn_id=self.aws_conn_id) @cached_property def get_sts_hook(self): return StsHook(aws_conn_id=self.aws_conn_id) ``` ########## airflow/providers/amazon/aws/example_dags/example_quicksight.py: ########## @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.amazon.aws.operators.quicksight import QuickSightCreateIngestionOperator +from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor + +DATA_SET_ID = os.getenv("DATA_SET_ID", "DemoDataSet_Test") +INGESTION_WAITING_ID = os.getenv("INGESTION_WAITING_ID", "DemoDataSet_Ingestion_Waiting_Test") +INGESTION_NO_WAITING_ID = os.getenv("INGESTION_NO_WAITING_ID", "DemoDataSet_Ingestion_No_Waiting_Test") + +with DAG( + dag_id="example_quicksight", + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=["example"], + catchup=False, +) as dag: + # Create and Start the QuickSight SPICE data ingestion + # and waits for its completion. + # [START howto_operator_quicksight] Review Comment: ```suggestion # [START howto_operator_quicksight_create_ingestion] ``` ########## airflow/providers/amazon/aws/hooks/quicksight.py: ########## @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time + +from botocore.exceptions import ClientError + +from airflow import AirflowException +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.hooks.sts import StsHook + + +class QuickSightHook(AwsBaseHook): + """ + Interact with Amazon QuickSight. + + Additional arguments (such as ``aws_conn_id``) may be specified and + are passed down to the underlying AwsBaseHook. + .. seealso:: + :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` + """ + + NON_TERMINAL_STATES = {"INITIALIZED", "QUEUED", "RUNNING"} + FAILED_STATES = {"FAILED"} + + def __init__(self, *args, **kwargs): + super().__init__(client_type="quicksight", *args, **kwargs) + + def create_ingestion( + self, + data_set_id: str, + ingestion_id: str, + ingestion_type: str, + wait_for_completion: bool = True, + check_interval: int = 30, + ): + """ + Creates and starts a new SPICE ingestion for a dataset. Refreshes the SPICE datasets + + :param data_set_id: ID of the dataset used in the ingestion. + :param ingestion_id: ID for the ingestion. + :param ingestion_type: Type of ingestion . "INCREMENTAL_REFRESH"|"FULL_REFRESH" + :param wait_for_completion: if the program should keep running until job finishes + :param check_interval: the time interval in seconds which the operator + will check the status of QuickSight Ingestion + :return: Returns descriptive information about the created data ingestion + having Ingestion ARN, HTTP status, ingestion ID and ingestion status. + :rtype: Dict + """ + + self.log.info("Creating QuickSight Ingestion for data set id %s.", data_set_id) + quicksight_client = self.get_conn() + try: + sts_hook = StsHook() Review Comment: Do we want to create a hook every-time we call this method? Should not we use something like [cached_property](https://docs.python.org/3/library/functools.html#functools.cached_property) ########## airflow/providers/amazon/aws/example_dags/example_quicksight.py: ########## @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.amazon.aws.operators.quicksight import QuickSightCreateIngestionOperator +from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor + +DATA_SET_ID = os.getenv("DATA_SET_ID", "DemoDataSet_Test") +INGESTION_WAITING_ID = os.getenv("INGESTION_WAITING_ID", "DemoDataSet_Ingestion_Waiting_Test") +INGESTION_NO_WAITING_ID = os.getenv("INGESTION_NO_WAITING_ID", "DemoDataSet_Ingestion_No_Waiting_Test") + +with DAG( + dag_id="example_quicksight", + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=["example"], + catchup=False, +) as dag: + # Create and Start the QuickSight SPICE data ingestion + # and waits for its completion. + # [START howto_operator_quicksight] + quicksight_create_ingestion = QuickSightCreateIngestionOperator( + data_set_id=DATA_SET_ID, + ingestion_id=INGESTION_WAITING_ID, + task_id="sample_quicksight_dag", + ) + quicksight_create_ingestion + # [END howto_operator_quicksight] Review Comment: ```suggestion # [END howto_operator_quicksight_create_ingestion] ``` ########## airflow/providers/amazon/aws/example_dags/example_quicksight.py: ########## @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.amazon.aws.operators.quicksight import QuickSightCreateIngestionOperator +from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor + +DATA_SET_ID = os.getenv("DATA_SET_ID", "DemoDataSet_Test") +INGESTION_WAITING_ID = os.getenv("INGESTION_WAITING_ID", "DemoDataSet_Ingestion_Waiting_Test") +INGESTION_NO_WAITING_ID = os.getenv("INGESTION_NO_WAITING_ID", "DemoDataSet_Ingestion_No_Waiting_Test") + +with DAG( + dag_id="example_quicksight", + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=["example"], + catchup=False, +) as dag: + # Create and Start the QuickSight SPICE data ingestion + # and waits for its completion. + # [START howto_operator_quicksight] + quicksight_create_ingestion = QuickSightCreateIngestionOperator( + data_set_id=DATA_SET_ID, + ingestion_id=INGESTION_WAITING_ID, + task_id="sample_quicksight_dag", + ) + quicksight_create_ingestion + # [END howto_operator_quicksight] + + # Create and Start the QuickSight SPICE data ingestion + # and does not wait for its completion + # [START howto_operator_quicksight_non_waiting] + quicksight_create_ingestion_no_waiting = QuickSightCreateIngestionOperator( + data_set_id=DATA_SET_ID, + ingestion_id=INGESTION_NO_WAITING_ID, + wait_for_completion=False, + task_id="sample_quicksight_no_waiting_dag", + ) + + # The following task checks the status of the QuickSight SPICE ingestion + # job until it succeeds. + quicksight_job_status = QuickSightSensor( + data_set_id=DATA_SET_ID, + ingestion_id=INGESTION_NO_WAITING_ID, + task_id="check_quicksight_job_status", + ) Review Comment: ```suggestion # [START howto_sensor_quicksight] quicksight_job_status = QuickSightSensor( data_set_id=DATA_SET_ID, ingestion_id=INGESTION_NO_WAITING_ID, task_id="check_quicksight_job_status", ) # [END howto_sensor_quicksight] ``` ########## airflow/providers/amazon/aws/sensors/quicksight.py: ########## @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, Optional + +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook +from airflow.providers.amazon.aws.hooks.sts import StsHook +from airflow.sensors.base import BaseSensorOperator + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class QuickSightSensor(BaseSensorOperator): + """ + Watches for the status of an Amazon QuickSight Ingestion. + Review Comment: It would be great to reference the related section in the documentation. ```suggestion .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor: QuickSightSensor ` ``` ########## airflow/providers/amazon/aws/sensors/quicksight.py: ########## @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, Optional + +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook +from airflow.providers.amazon.aws.hooks.sts import StsHook +from airflow.sensors.base import BaseSensorOperator + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class QuickSightSensor(BaseSensorOperator): + """ + Watches for the status of an Amazon QuickSight Ingestion. + + :param data_set_id: ID of the dataset used in the ingestion. + :param ingestion_id: ID for the ingestion. + :param aws_conn_id: The Airflow connection used for AWS credentials. (templated) + If this is None or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then the default boto3 configuration would be used (and must be + maintained on each worker node). + """ + + def __init__( + self, + *, + data_set_id: str, + ingestion_id: str, + aws_conn_id: str = "aws_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.data_set_id = data_set_id + self.ingestion_id = ingestion_id + self.aws_conn_id = aws_conn_id + self.success_status = "COMPLETED" + self.errored_statuses = ("FAILED", "CANCELLED") + self.quicksight_hook: Optional[QuickSightHook] = None + self.sts_hook: Optional[StsHook] = None + + def poke(self, context: "Context"): + """ + Pokes until the QuickSight Ingestion has successfully finished. + + :param context: The task context during execution. + :return: True if it COMPLETED and False if not. + :rtype: bool + """ + quicksight_hook, sts_hook = self.get_hook() Review Comment: ```suggestion quicksight_hook = self.get_quicksight_hook() sts_hook = self.get_sts_hook() ``` ########## docs/apache-airflow-providers-amazon/operators/quicksight.rst: ########## @@ -0,0 +1,73 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Amazon QuickSight Operators +======================================== + +Prerequisite Tasks +------------------ + +.. include:: _partials/prerequisite_tasks.rst + +Overview +-------- + +Airflow to Amazon QuickSight integration allows users to create and start the SPICE ingestion for dataset. + + - :class:`~airflow.providers.amazon.aws.operators.quicksight.QuickSightCreateIngestionOperator` + - :class:`~airflow.providers.amazon.aws.sensor.quicksight.QuickSightSensor` + +Purpose +""""""" + +This example DAG ``example_quicksight.py`` uses ``QuickSightCreateIngestionOperator`` for +creating and starting the SPICE ingestion for the dataset configured to use SPICE. In the example, +we created two ingestions. One of the ingestions waits for the SPICE ingestion to complete while +other ingestion does not wait for completion and uses ``QuickSightSensor`` to check for ingestion +status until it completes + + +Defining tasks +"""""""""""""" + +In the following code we create and start a QuickSight SPICE ingestion for the dataset and wait +for its completion. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_quicksight.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_quicksight] + :end-before: [END howto_operator_quicksight] + +In the below example, we create and start the SPICE ingestion but do not wait for completion. We use +sensor to poll for Ingestion status until it Completes. + + + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_quicksight.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_quicksight_non_waiting] + :end-before: [END howto_operator_quicksight_non_waiting] + Review Comment: For the sake of being consistent with other AWS documentations, could you please create 2 sections: one for the operator and one for the sensor ```suggestion .. _howto/operator:QuickSightCreateIngestionOperator: Create an Amazon Quicksight ingestion """""""""""""""""""""""""""""""""""""""""" Text explaining what the operator does .. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_quicksight.py :language: python :dedent: 4 :start-after: [START howto_operator_quicksight_create_ingestion] :end-before: [END howto_operator_quicksight_create_ingestion] .. _howto/sensor:QuickSightSensor: Amazon Quicksight sensor """"""""""""""""""""""""""""" Text explaining what the sensor does .. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_quicksight.py :language: python :dedent: 4 :start-after: [START howto_sensor_quicksight] :end-before: [END howto_sensor_quicksight] ``` ########## airflow/providers/amazon/aws/hooks/quicksight.py: ########## @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time + +from botocore.exceptions import ClientError + +from airflow import AirflowException +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.hooks.sts import StsHook + + +class QuickSightHook(AwsBaseHook): + """ + Interact with Amazon QuickSight. + + Additional arguments (such as ``aws_conn_id``) may be specified and + are passed down to the underlying AwsBaseHook. + .. seealso:: + :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` + """ + + NON_TERMINAL_STATES = {"INITIALIZED", "QUEUED", "RUNNING"} + FAILED_STATES = {"FAILED"} + + def __init__(self, *args, **kwargs): + super().__init__(client_type="quicksight", *args, **kwargs) + + def create_ingestion( + self, + data_set_id: str, + ingestion_id: str, + ingestion_type: str, + wait_for_completion: bool = True, + check_interval: int = 30, + ): + """ + Creates and starts a new SPICE ingestion for a dataset. Refreshes the SPICE datasets + + :param data_set_id: ID of the dataset used in the ingestion. + :param ingestion_id: ID for the ingestion. + :param ingestion_type: Type of ingestion . "INCREMENTAL_REFRESH"|"FULL_REFRESH" + :param wait_for_completion: if the program should keep running until job finishes + :param check_interval: the time interval in seconds which the operator + will check the status of QuickSight Ingestion + :return: Returns descriptive information about the created data ingestion + having Ingestion ARN, HTTP status, ingestion ID and ingestion status. + :rtype: Dict + """ + + self.log.info("Creating QuickSight Ingestion for data set id %s.", data_set_id) + quicksight_client = self.get_conn() + try: + sts_hook = StsHook() + aws_account_id = sts_hook.get_account_number() + create_ingestion_response = quicksight_client.create_ingestion( + DataSetId=data_set_id, + IngestionId=ingestion_id, + IngestionType=ingestion_type, + AwsAccountId=aws_account_id, + ) + # aws_account_id = boto3.client('sts').get_caller_identity()['Account'] + # print(aws_account_id) Review Comment: Remove? ########## airflow/providers/amazon/aws/example_dags/example_quicksight.py: ########## @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.amazon.aws.operators.quicksight import QuickSightCreateIngestionOperator +from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor + +DATA_SET_ID = os.getenv("DATA_SET_ID", "DemoDataSet_Test") +INGESTION_WAITING_ID = os.getenv("INGESTION_WAITING_ID", "DemoDataSet_Ingestion_Waiting_Test") +INGESTION_NO_WAITING_ID = os.getenv("INGESTION_NO_WAITING_ID", "DemoDataSet_Ingestion_No_Waiting_Test") + +with DAG( + dag_id="example_quicksight", + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=["example"], + catchup=False, +) as dag: + # Create and Start the QuickSight SPICE data ingestion + # and waits for its completion. + # [START howto_operator_quicksight] + quicksight_create_ingestion = QuickSightCreateIngestionOperator( + data_set_id=DATA_SET_ID, + ingestion_id=INGESTION_WAITING_ID, + task_id="sample_quicksight_dag", + ) + quicksight_create_ingestion + # [END howto_operator_quicksight] + + # Create and Start the QuickSight SPICE data ingestion + # and does not wait for its completion + # [START howto_operator_quicksight_non_waiting] + quicksight_create_ingestion_no_waiting = QuickSightCreateIngestionOperator( + data_set_id=DATA_SET_ID, + ingestion_id=INGESTION_NO_WAITING_ID, + wait_for_completion=False, + task_id="sample_quicksight_no_waiting_dag", + ) + + # The following task checks the status of the QuickSight SPICE ingestion + # job until it succeeds. + quicksight_job_status = QuickSightSensor( + data_set_id=DATA_SET_ID, + ingestion_id=INGESTION_NO_WAITING_ID, + task_id="check_quicksight_job_status", + ) Review Comment: Ideally we want to have 2 different sections in the documentation for the operator and the sensor. Thus, we need to have 2 flags: `howto_operator_quicksight_create_ingestion` and `howto_sensor_quicksight` ########## airflow/providers/amazon/aws/operators/quicksight.py: ########## @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, Optional, Sequence + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + +DEFAULT_CONN_ID = "aws_default" + + +class QuickSightCreateIngestionOperator(BaseOperator): + """ + Creates and starts a new SPICE ingestion for a dataset. + Also, helps to Refresh existing SPICE datasets + Review Comment: It would be great to reference the related section in the documentation. ```suggestion .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/QuickSightCreateIngestionOperator ` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
