This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c149c341e Add Quicksight create ingestion Hook and Operator (#21863)
5c149c341e is described below

commit 5c149c341eb420fd5f8fc77534b1236ad8a9c6c1
Author: Harpreet Singh <[email protected]>
AuthorDate: Tue May 10 20:24:13 2022 +0530

    Add Quicksight create ingestion Hook and Operator (#21863)
    
    * Add Quicksight create ingestion Hook and Operator
    
    Co-authored-by: eladkal <[email protected]>
---
 .../amazon/aws/example_dags/example_quicksight.py  |  55 ++++++++
 airflow/providers/amazon/aws/hooks/quicksight.py   | 153 +++++++++++++++++++++
 airflow/providers/amazon/aws/hooks/sts.py          |  41 ++++++
 .../providers/amazon/aws/operators/quicksight.py   |  98 +++++++++++++
 airflow/providers/amazon/aws/sensors/quicksight.py |  96 +++++++++++++
 airflow/providers/amazon/provider.yaml             |  22 +++
 .../operators/quicksight.rst                       |  67 +++++++++
 docs/integration-logos/aws/[email protected] | Bin 0 -> 3484 bytes
 .../aws/[email protected]          | Bin 0 -> 4568 bytes
 docs/spelling_wordlist.txt                         |   1 +
 .../providers/amazon/aws/hooks/test_quicksight.py  | 141 +++++++++++++++++++
 tests/providers/amazon/aws/hooks/test_sts.py       |  25 ++++
 .../amazon/aws/operators/test_quicksight.py        |  58 ++++++++
 .../amazon/aws/sensors/test_quicksight.py          |  77 +++++++++++
 14 files changed, 834 insertions(+)

diff --git a/airflow/providers/amazon/aws/example_dags/example_quicksight.py 
b/airflow/providers/amazon/aws/example_dags/example_quicksight.py
new file mode 100644
index 0000000000..dd46035daa
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_quicksight.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.quicksight import 
QuickSightCreateIngestionOperator
+from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor
+
+DATA_SET_ID = os.getenv("DATA_SET_ID", "DemoDataSet_Test")
+INGESTION_NO_WAITING_ID = os.getenv("INGESTION_NO_WAITING_ID", 
"DemoDataSet_Ingestion_No_Waiting_Test")
+
+with DAG(
+    dag_id="example_quicksight",
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    tags=["example"],
+    catchup=False,
+) as dag:
+    # Create and Start the QuickSight SPICE data ingestion
+    # and does not wait for its completion
+    # [START howto_operator_quicksight_create_ingestion]
+    quicksight_create_ingestion_no_waiting = QuickSightCreateIngestionOperator(
+        data_set_id=DATA_SET_ID,
+        ingestion_id=INGESTION_NO_WAITING_ID,
+        wait_for_completion=False,
+        task_id="sample_quicksight_no_waiting_dag",
+    )
+    # [END howto_operator_quicksight_create_ingestion]
+
+    # The following task checks the status of the QuickSight SPICE ingestion
+    # job until it succeeds.
+    # [START howto_sensor_quicksight]
+    quicksight_job_status = QuickSightSensor(
+        data_set_id=DATA_SET_ID,
+        ingestion_id=INGESTION_NO_WAITING_ID,
+        task_id="check_quicksight_job_status",
+    )
+    # [END howto_sensor_quicksight]
+    quicksight_create_ingestion_no_waiting >> quicksight_job_status
diff --git a/airflow/providers/amazon/aws/hooks/quicksight.py 
b/airflow/providers/amazon/aws/hooks/quicksight.py
new file mode 100644
index 0000000000..a7e90c36cf
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/quicksight.py
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import time
+
+from botocore.exceptions import ClientError
+
+from airflow import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+
+class QuickSightHook(AwsBaseHook):
+    """
+    Interact with Amazon QuickSight.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+    .. seealso::
+    :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    NON_TERMINAL_STATES = {"INITIALIZED", "QUEUED", "RUNNING"}
+    FAILED_STATES = {"FAILED"}
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(client_type="quicksight", *args, **kwargs)
+
+    @cached_property
+    def sts_hook(self):
+        return StsHook(aws_conn_id=self.aws_conn_id)
+
+    def create_ingestion(
+        self,
+        data_set_id: str,
+        ingestion_id: str,
+        ingestion_type: str,
+        wait_for_completion: bool = True,
+        check_interval: int = 30,
+    ):
+        """
+        Creates and starts a new SPICE ingestion for a dataset. Refreshes the 
SPICE datasets
+
+        :param data_set_id:  ID of the dataset used in the ingestion.
+        :param ingestion_id: ID for the ingestion.
+        :param ingestion_type: Type of ingestion . 
"INCREMENTAL_REFRESH"|"FULL_REFRESH"
+        :param wait_for_completion: if the program should keep running until 
job finishes
+        :param check_interval: the time interval in seconds which the operator
+            will check the status of QuickSight Ingestion
+        :return: Returns descriptive information about the created data 
ingestion
+            having Ingestion ARN, HTTP status, ingestion ID and ingestion 
status.
+        :rtype: Dict
+        """
+
+        self.log.info("Creating QuickSight Ingestion for data set id %s.", 
data_set_id)
+        quicksight_client = self.get_conn()
+        try:
+            aws_account_id = self.sts_hook.get_account_number()
+            create_ingestion_response = quicksight_client.create_ingestion(
+                DataSetId=data_set_id,
+                IngestionId=ingestion_id,
+                IngestionType=ingestion_type,
+                AwsAccountId=aws_account_id,
+            )
+
+            if wait_for_completion:
+                self.wait_for_state(
+                    aws_account_id=aws_account_id,
+                    data_set_id=data_set_id,
+                    ingestion_id=ingestion_id,
+                    target_state={"COMPLETED"},
+                    check_interval=check_interval,
+                )
+            return create_ingestion_response
+        except Exception as general_error:
+            self.log.error("Failed to run Amazon QuickSight create_ingestion 
API, error: %s", general_error)
+            raise
+
+    def get_status(self, aws_account_id: str, data_set_id: str, ingestion_id: 
str):
+        """
+        Get the current status of QuickSight Create Ingestion API.
+
+        :param aws_account_id: An AWS Account ID
+        :param data_set_id: QuickSight Data Set ID
+        :param ingestion_id: QuickSight Ingestion ID
+        :return: An QuickSight Ingestion Status
+        :rtype: str
+        """
+        try:
+            describe_ingestion_response = self.get_conn().describe_ingestion(
+                AwsAccountId=aws_account_id, DataSetId=data_set_id, 
IngestionId=ingestion_id
+            )
+            return describe_ingestion_response["Ingestion"]["IngestionStatus"]
+        except KeyError:
+            raise AirflowException("Could not get status of the Amazon 
QuickSight Ingestion")
+        except ClientError:
+            raise AirflowException("AWS request failed, check logs for more 
info")
+
+    def wait_for_state(
+        self,
+        aws_account_id: str,
+        data_set_id: str,
+        ingestion_id: str,
+        target_state: set,
+        check_interval: int,
+    ):
+        """
+        Check status of a QuickSight Create Ingestion API
+
+        :param aws_account_id: An AWS Account ID
+        :param data_set_id: QuickSight Data Set ID
+        :param ingestion_id: QuickSight Ingestion ID
+        :param target_state: Describes the QuickSight Job's Target State
+        :param check_interval: the time interval in seconds which the operator
+            will check the status of QuickSight Ingestion
+        :return: response of describe_ingestion call after Ingestion is is done
+        """
+
+        sec = 0
+        status = self.get_status(aws_account_id, data_set_id, ingestion_id)
+        while status in self.NON_TERMINAL_STATES and status != target_state:
+            self.log.info("Current status is %s", status)
+            time.sleep(check_interval)
+            sec += check_interval
+            if status in self.FAILED_STATES:
+                raise AirflowException("The Amazon QuickSight Ingestion 
failed!")
+            if status == "CANCELLED":
+                raise AirflowException("The Amazon QuickSight SPICE ingestion 
cancelled!")
+            status = self.get_status(aws_account_id, data_set_id, ingestion_id)
+
+        self.log.info("QuickSight Ingestion completed")
+        return status
diff --git a/airflow/providers/amazon/aws/hooks/sts.py 
b/airflow/providers/amazon/aws/hooks/sts.py
new file mode 100644
index 0000000000..aff787ee5d
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/sts.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class StsHook(AwsBaseHook):
+    """
+    Interact with AWS Security Token Service (STS)
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+    .. seealso::
+    :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(client_type="sts", *args, **kwargs)
+
+    def get_account_number(self) -> str:
+        """Get the account Number"""
+
+        try:
+            return self.get_conn().get_caller_identity()['Account']
+        except Exception as general_error:
+            self.log.error("Failed to get the AWS Account Number, error: %s", 
general_error)
+            raise
diff --git a/airflow/providers/amazon/aws/operators/quicksight.py 
b/airflow/providers/amazon/aws/operators/quicksight.py
new file mode 100644
index 0000000000..a9da61bdfc
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/quicksight.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Optional, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+DEFAULT_CONN_ID = "aws_default"
+
+
+class QuickSightCreateIngestionOperator(BaseOperator):
+    """
+    Creates and starts a new SPICE ingestion for a dataset.
+    Also, helps to Refresh existing SPICE datasets.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:QuickSightCreateIngestionOperator`
+
+    :param data_set_id:  ID of the dataset used in the ingestion.
+    :param ingestion_id: ID for the ingestion.
+    :param ingestion_type: Type of ingestion. Values Can be  
INCREMENTAL_REFRESH or FULL_REFRESH.
+        Default FULL_REFRESH.
+    :param wait_for_completion: If wait is set to True, the time interval, in 
seconds,
+        that the operation waits to check the status of the Amazon QuickSight 
Ingestion.
+    :param check_interval: if wait is set to be true, this is the time interval
+        in seconds which the operator will check the status of the Amazon 
QuickSight Ingestion
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(templated)
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    :param region: Which AWS region the connection should use. (templated)
+         If this is None or empty then the default boto3 behaviour is used.
+    """
+
+    template_fields: Sequence[str] = (
+        "data_set_id",
+        "ingestion_id",
+        "ingestion_type",
+        "wait_for_completion",
+        "check_interval",
+        "aws_conn_id",
+        "region",
+    )
+    ui_color = "#ffd700"
+
+    def __init__(
+        self,
+        data_set_id: str,
+        ingestion_id: str,
+        ingestion_type: str = "FULL_REFRESH",
+        wait_for_completion: bool = True,
+        check_interval: int = 30,
+        aws_conn_id: str = DEFAULT_CONN_ID,
+        region: Optional[str] = None,
+        **kwargs,
+    ):
+        self.data_set_id = data_set_id
+        self.ingestion_id = ingestion_id
+        self.ingestion_type = ingestion_type
+        self.wait_for_completion = wait_for_completion
+        self.check_interval = check_interval
+        self.aws_conn_id = aws_conn_id
+        self.region = region
+        super().__init__(**kwargs)
+
+    def execute(self, context: "Context"):
+        hook = QuickSightHook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region,
+        )
+        self.log.info("Running the Amazon QuickSight SPICE Ingestion on 
Dataset ID: %s)", self.data_set_id)
+        return hook.create_ingestion(
+            data_set_id=self.data_set_id,
+            ingestion_id=self.ingestion_id,
+            ingestion_type=self.ingestion_type,
+            wait_for_completion=self.wait_for_completion,
+            check_interval=self.check_interval,
+        )
diff --git a/airflow/providers/amazon/aws/sensors/quicksight.py 
b/airflow/providers/amazon/aws/sensors/quicksight.py
new file mode 100644
index 0000000000..da94980e80
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/quicksight.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+from typing import TYPE_CHECKING, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+
+class QuickSightSensor(BaseSensorOperator):
+    """
+    Watches for the status of an Amazon QuickSight Ingestion.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the 
guide:
+        :ref:`howto/sensor:QuickSightSensor`
+
+    :param data_set_id:  ID of the dataset used in the ingestion.
+    :param ingestion_id: ID for the ingestion.
+    :param aws_conn_id: The Airflow connection used for AWS credentials. 
(templated)
+         If this is None or empty then the default boto3 behaviour is used. If
+         running Airflow in a distributed manner and aws_conn_id is None or
+         empty, then the default boto3 configuration would be used (and must be
+         maintained on each worker node).
+    """
+
+    def __init__(
+        self,
+        *,
+        data_set_id: str,
+        ingestion_id: str,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.data_set_id = data_set_id
+        self.ingestion_id = ingestion_id
+        self.aws_conn_id = aws_conn_id
+        self.success_status = "COMPLETED"
+        self.errored_statuses = ("FAILED", "CANCELLED")
+        self.quicksight_hook: Optional[QuickSightHook] = None
+        self.sts_hook: Optional[StsHook] = None
+
+    def poke(self, context: "Context"):
+        """
+        Pokes until the QuickSight Ingestion has successfully finished.
+
+        :param context: The task context during execution.
+        :return: True if it COMPLETED and False if not.
+        :rtype: bool
+        """
+        quicksight_hook = self.get_quicksight_hook
+        sts_hook = self.get_sts_hook
+        self.log.info("Poking for Amazon QuickSight Ingestion ID: %s", 
self.ingestion_id)
+        aws_account_id = sts_hook.get_account_number()
+        quicksight_ingestion_state = quicksight_hook.get_status(
+            aws_account_id, self.data_set_id, self.ingestion_id
+        )
+        self.log.info("QuickSight Status: %s", quicksight_ingestion_state)
+        if quicksight_ingestion_state in self.errored_statuses:
+            raise AirflowException("The QuickSight Ingestion failed!")
+        return quicksight_ingestion_state == self.success_status
+
+    @cached_property
+    def get_quicksight_hook(self):
+        return QuickSightHook(aws_conn_id=self.aws_conn_id)
+
+    @cached_property
+    def get_sts_hook(self):
+        return StsHook(aws_conn_id=self.aws_conn_id)
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 709667a80a..fdb218db1e 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -201,6 +201,16 @@ integrations:
     how-to-guide:
       - /docs/apache-airflow-providers-amazon/operators/dms.rst
     tags: [aws]
+  - integration-name: Amazon QuickSight
+    external-doc-url: https://aws.amazon.com/quicksight/
+    logo: /integration-logos/aws/[email protected]
+    how-to-guide:
+      - /docs/apache-airflow-providers-amazon/operators/quicksight.rst
+    tags: [aws]
+  - integration-name: AWS Security Token Service (STS)
+    external-doc-url: 
https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
+    logo: /integration-logos/aws/[email protected]
+    tags: [aws]
 
 operators:
   - integration-name: Amazon Athena
@@ -296,6 +306,9 @@ operators:
       - airflow.providers.amazon.aws.operators.redshift_sql
       - airflow.providers.amazon.aws.operators.redshift_cluster
       - airflow.providers.amazon.aws.operators.redshift_data
+  - integration-name: Amazon QuickSight
+    python-modules:
+      - airflow.providers.amazon.aws.operators.quicksight
 
 sensors:
   - integration-name: Amazon Athena
@@ -364,6 +377,9 @@ sensors:
     python-modules:
       - airflow.providers.amazon.aws.sensors.step_function_execution
       - airflow.providers.amazon.aws.sensors.step_function
+  - integration-name: Amazon QuickSight
+    python-modules:
+      - airflow.providers.amazon.aws.sensors.quicksight
 
 hooks:
   - integration-name: Amazon Athena
@@ -451,6 +467,12 @@ hooks:
   - integration-name: AWS Step Functions
     python-modules:
       - airflow.providers.amazon.aws.hooks.step_function
+  - integration-name: Amazon QuickSight
+    python-modules:
+      - airflow.providers.amazon.aws.hooks.quicksight
+  - integration-name: AWS Security Token Service (STS)
+    python-modules:
+      - airflow.providers.amazon.aws.hooks.sts
 
 transfers:
   - source-integration-name: Amazon DynamoDB
diff --git a/docs/apache-airflow-providers-amazon/operators/quicksight.rst 
b/docs/apache-airflow-providers-amazon/operators/quicksight.rst
new file mode 100644
index 0000000000..4a65055b67
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/quicksight.rst
@@ -0,0 +1,67 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Amazon QuickSight Operators
+========================================
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon QuickSight integration allows users to create and start the 
SPICE ingestion for dataset.
+
+  - 
:class:`~airflow.providers.amazon.aws.operators.quicksight.QuickSightCreateIngestionOperator`
+  - :class:`~airflow.providers.amazon.aws.sensor.quicksight.QuickSightSensor`
+
+.. _howto/operator:QuickSightCreateIngestionOperator:
+
+Amazon QuickSight CreateIngestion Operator
+"""""""""""""""""""""""""""""""""""""""""""
+
+The QuickSightCreateIngestionOperator Creates and starts a new SPICE ingestion 
for a dataset.
+The operator also refreshes existing SPICE datasets
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_quicksight.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_quicksight_create_ingestion]
+    :end-before: [END howto_operator_quicksight_create_ingestion]
+
+.. _howto/sensor:QuickSightSensor:
+
+Amazon QuickSight Sensor
+""""""""""""""""""""""""
+
+The QuickSightSensor wait for Amazon QuickSight CreateIngestion until it 
reaches a terminal state
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_quicksight.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_quicksight]
+    :end-before: [END howto_sensor_quicksight]
+
+Reference
+---------
+
+For further information, look at:
+
+* `Boto3 Library Documentation for QuickSight 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html>`__
diff --git a/docs/integration-logos/aws/[email protected] 
b/docs/integration-logos/aws/[email protected]
new file mode 100644
index 0000000000..5f0cecfd36
Binary files /dev/null and b/docs/integration-logos/aws/[email protected] 
differ
diff --git a/docs/integration-logos/aws/[email protected] 
b/docs/integration-logos/aws/[email protected]
new file mode 100644
index 0000000000..55007bdfab
Binary files /dev/null and 
b/docs/integration-logos/aws/[email protected] differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 3672c4ac2f..703c713644 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -934,6 +934,7 @@ infile
 influxdb
 infoType
 infoTypes
+ingestions
 ini
 init
 initcontainer
diff --git a/tests/providers/amazon/aws/hooks/test_quicksight.py 
b/tests/providers/amazon/aws/hooks/test_quicksight.py
new file mode 100644
index 0000000000..ddd712e17b
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_quicksight.py
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pytest
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+
+AWS_ACCOUNT_ID = "123456789012"
+
+
+MOCK_DATA = {
+    "DataSetId": "DemoDataSet",
+    "IngestionId": "DemoDataSet_Ingestion",
+    "IngestionType": "INCREMENTAL_REFRESH",
+    "AwsAccountId": AWS_ACCOUNT_ID,
+}
+
+MOCK_CREATE_INGESTION_RESPONSE = {
+    "Status": 201,
+    "Arn": 
"arn:aws:quicksight:us-east-1:123456789012:dataset/DemoDataSet/ingestion/DemoDataSet3_Ingestion",
+    "IngestionId": "DemoDataSet_Ingestion",
+    "IngestionStatus": "INITIALIZED",
+    "RequestId": "fc1f7eea-1327-41d6-9af7-c12f097ed343",
+}
+
+MOCK_DESCRIBE_INGESTION_SUCCESS = {
+    "Status": 200,
+    "Ingestion": {
+        "Arn": 
"arn:aws:quicksight:region:123456789012:dataset/DemoDataSet/ingestion/DemoDataSet3_Ingestion",
+        "IngestionId": "DemoDataSet_Ingestion",
+        "IngestionStatus": "COMPLETED",
+        "ErrorInfo": {},
+        "RowInfo": {"RowsIngested": 228, "RowsDropped": 0, 
"TotalRowsInDataset": 228},
+        "CreatedTime": 1646589017.05,
+        "IngestionTimeInSeconds": 17,
+        "IngestionSizeInBytes": 27921,
+        "RequestSource": "MANUAL",
+        "RequestType": "FULL_REFRESH",
+    },
+    "RequestId": "DemoDataSet_Ingestion_Request_ID",
+}
+
+MOCK_DESCRIBE_INGESTION_FAILURE = {
+    "Status": 403,
+    "Ingestion": {
+        "Arn": 
"arn:aws:quicksight:region:123456789012:dataset/DemoDataSet/ingestion/DemoDataSet3_Ingestion",
+        "IngestionId": "DemoDataSet_Ingestion",
+        "IngestionStatus": "Failed",
+        "ErrorInfo": {},
+        "RowInfo": {"RowsIngested": 228, "RowsDropped": 0, 
"TotalRowsInDataset": 228},
+        "CreatedTime": 1646589017.05,
+        "IngestionTimeInSeconds": 17,
+        "IngestionSizeInBytes": 27921,
+        "RequestSource": "MANUAL",
+        "RequestType": "FULL_REFRESH",
+    },
+    "RequestId": "DemoDataSet_Ingestion_Request_ID",
+}
+
+
+class TestQuicksight:
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = QuickSightHook(aws_conn_id="aws_default", 
region_name="us-east-1")
+        assert hook.conn is not None
+
+    @mock.patch.object(QuickSightHook, "get_conn")
+    @mock.patch.object(StsHook, "get_conn")
+    @mock.patch.object(StsHook, "get_account_number")
+    def test_create_ingestion(self, mock_get_account_number, sts_conn, 
mock_conn):
+        mock_conn.return_value.create_ingestion.return_value = 
MOCK_CREATE_INGESTION_RESPONSE
+        mock_get_account_number.return_value = AWS_ACCOUNT_ID
+        quicksight_hook = QuickSightHook(aws_conn_id="aws_default", 
region_name="us-east-1")
+        result = quicksight_hook.create_ingestion(
+            data_set_id="DemoDataSet",
+            ingestion_id="DemoDataSet_Ingestion",
+            ingestion_type="INCREMENTAL_REFRESH",
+        )
+        expected_call_params = MOCK_DATA
+        
mock_conn.return_value.create_ingestion.assert_called_with(**expected_call_params)
+        assert result == MOCK_CREATE_INGESTION_RESPONSE
+
+    @mock.patch.object(StsHook, "get_conn")
+    @mock.patch.object(StsHook, "get_account_number")
+    def test_create_ingestion_exception(self, mock_get_account_number, 
sts_conn):
+        mock_get_account_number.return_value = AWS_ACCOUNT_ID
+        hook = QuickSightHook(aws_conn_id="aws_default")
+        with pytest.raises(ClientError) as raised_exception:
+            hook.create_ingestion(
+                data_set_id="DemoDataSet",
+                ingestion_id="DemoDataSet_Ingestion",
+                ingestion_type="INCREMENTAL_REFRESH",
+            )
+        ex = raised_exception.value
+        assert ex.operation_name == "CreateIngestion"
+
+    @mock.patch.object(QuickSightHook, "get_conn")
+    def test_get_job_status(self, mock_conn):
+        """
+        Test get job status
+        """
+        mock_conn.return_value.describe_ingestion.return_value = 
MOCK_DESCRIBE_INGESTION_SUCCESS
+        quicksight_hook = QuickSightHook(aws_conn_id="aws_default", 
region_name="us-east-1")
+        result = quicksight_hook.get_status(
+            data_set_id="DemoDataSet",
+            ingestion_id="DemoDataSet_Ingestion",
+            aws_account_id="123456789012",
+        )
+        assert result == "COMPLETED"
+
+    @mock.patch.object(QuickSightHook, "get_conn")
+    def test_get_job_status_failed(self, mock_conn):
+        """
+        Test get job status
+        """
+        mock_conn.return_value.describe_ingestion.return_value = 
MOCK_DESCRIBE_INGESTION_FAILURE
+        quicksight_hook = QuickSightHook(aws_conn_id="aws_default", 
region_name="us-east-1")
+        result = quicksight_hook.get_status(
+            data_set_id="DemoDataSet",
+            ingestion_id="DemoDataSet_Ingestion",
+            aws_account_id="123456789012",
+        )
+        assert result == "Failed"
diff --git a/tests/providers/amazon/aws/hooks/test_sts.py 
b/tests/providers/amazon/aws/hooks/test_sts.py
new file mode 100644
index 0000000000..b33ec5082b
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_sts.py
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+
+
+class TestSTS:
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = StsHook(aws_conn_id="aws_default", region_name="us-east-1")
+        assert hook.conn is not None
diff --git a/tests/providers/amazon/aws/operators/test_quicksight.py 
b/tests/providers/amazon/aws/operators/test_quicksight.py
new file mode 100644
index 0000000000..f1f1568765
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_quicksight.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+from airflow.providers.amazon.aws.operators.quicksight import 
QuickSightCreateIngestionOperator
+
+DATA_SET_ID = "DemoDataSet"
+INGESTION_ID = "DemoDataSet_Ingestion"
+AWS_ACCOUNT_ID = "123456789012"
+INGESTION_TYPE = "FULL_REFRESH"
+
+MOCK_RESPONSE = {
+    "Status": 201,
+    "Arn": 
"arn:aws:quicksight:us-east-1:123456789012:dataset/DemoDataSet/ingestion/DemoDataSet_Ingestion",
+    "IngestionId": "DemoDataSet_Ingestion",
+    "IngestionStatus": "INITIALIZED",
+    "RequestId": "fc1f7eea-1327-41d6-9af7-c12f097ed343",
+}
+
+
+class TestQuickSightCreateIngestionOperator(unittest.TestCase):
+    def setUp(self):
+        self.quicksight = QuickSightCreateIngestionOperator(
+            task_id="test_quicksight_operator",
+            data_set_id=DATA_SET_ID,
+            ingestion_id=INGESTION_ID,
+        )
+
+    @mock.patch.object(QuickSightHook, "get_conn")
+    @mock.patch.object(QuickSightHook, "create_ingestion")
+    def test_execute(self, mock_create_ingestion, mock_client):
+        mock_create_ingestion.return_value = MOCK_RESPONSE
+        self.quicksight.execute(None)
+        mock_create_ingestion.assert_called_once_with(
+            data_set_id=DATA_SET_ID,
+            ingestion_id=INGESTION_ID,
+            ingestion_type="FULL_REFRESH",
+            wait_for_completion=True,
+            check_interval=30,
+        )
diff --git a/tests/providers/amazon/aws/sensors/test_quicksight.py 
b/tests/providers/amazon/aws/sensors/test_quicksight.py
new file mode 100644
index 0000000000..280734cf34
--- /dev/null
+++ b/tests/providers/amazon/aws/sensors/test_quicksight.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor
+
+AWS_ACCOUNT_ID = "123456789012"
+DATA_SET_ID = "DemoDataSet"
+INGESTION_ID = "DemoDataSet_Ingestion"
+
+
+class TestQuickSightSensor(unittest.TestCase):
+    def setUp(self):
+        self.sensor = QuickSightSensor(
+            task_id="test_quicksight_sensor",
+            aws_conn_id="aws_default",
+            data_set_id="DemoDataSet",
+            ingestion_id="DemoDataSet_Ingestion",
+        )
+
+    @mock.patch.object(QuickSightHook, "get_status")
+    @mock.patch.object(StsHook, "get_conn")
+    @mock.patch.object(StsHook, "get_account_number")
+    def test_poke_success(self, mock_get_account_number, sts_conn, 
mock_get_status):
+        mock_get_account_number.return_value = AWS_ACCOUNT_ID
+        mock_get_status.return_value = "COMPLETED"
+        self.assertTrue(self.sensor.poke({}))
+        mock_get_status.assert_called_once_with(AWS_ACCOUNT_ID, DATA_SET_ID, 
INGESTION_ID)
+
+    @mock.patch.object(QuickSightHook, "get_status")
+    @mock.patch.object(StsHook, "get_conn")
+    @mock.patch.object(StsHook, "get_account_number")
+    def test_poke_cancelled(self, mock_get_account_number, sts_conn, 
mock_get_status):
+        mock_get_account_number.return_value = AWS_ACCOUNT_ID
+        mock_get_status.return_value = "CANCELLED"
+        with self.assertRaises(AirflowException):
+            self.sensor.poke({})
+        mock_get_status.assert_called_once_with(AWS_ACCOUNT_ID, DATA_SET_ID, 
INGESTION_ID)
+
+    @mock.patch.object(QuickSightHook, "get_status")
+    @mock.patch.object(StsHook, "get_conn")
+    @mock.patch.object(StsHook, "get_account_number")
+    def test_poke_failed(self, mock_get_account_number, sts_conn, 
mock_get_status):
+        mock_get_account_number.return_value = AWS_ACCOUNT_ID
+        mock_get_status.return_value = "FAILED"
+        with self.assertRaises(AirflowException):
+            self.sensor.poke({})
+        mock_get_status.assert_called_once_with(AWS_ACCOUNT_ID, DATA_SET_ID, 
INGESTION_ID)
+
+    @mock.patch.object(QuickSightHook, "get_status")
+    @mock.patch.object(StsHook, "get_conn")
+    @mock.patch.object(StsHook, "get_account_number")
+    def test_poke_initialized(self, mock_get_account_number, sts_conn, 
mock_get_status):
+        mock_get_account_number.return_value = AWS_ACCOUNT_ID
+        mock_get_status.return_value = "INITIALIZED"
+        self.assertFalse(self.sensor.poke({}))
+        mock_get_status.assert_called_once_with(AWS_ACCOUNT_ID, DATA_SET_ID, 
INGESTION_ID)

Reply via email to