This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c149c341e Add Quicksight create ingestion Hook and Operator (#21863)
5c149c341e is described below
commit 5c149c341eb420fd5f8fc77534b1236ad8a9c6c1
Author: Harpreet Singh <[email protected]>
AuthorDate: Tue May 10 20:24:13 2022 +0530
Add Quicksight create ingestion Hook and Operator (#21863)
* Add Quicksight create ingestion Hook and Operator
Co-authored-by: eladkal <[email protected]>
---
.../amazon/aws/example_dags/example_quicksight.py | 55 ++++++++
airflow/providers/amazon/aws/hooks/quicksight.py | 153 +++++++++++++++++++++
airflow/providers/amazon/aws/hooks/sts.py | 41 ++++++
.../providers/amazon/aws/operators/quicksight.py | 98 +++++++++++++
airflow/providers/amazon/aws/sensors/quicksight.py | 96 +++++++++++++
airflow/providers/amazon/provider.yaml | 22 +++
.../operators/quicksight.rst | 67 +++++++++
docs/integration-logos/aws/[email protected] | Bin 0 -> 3484 bytes
.../aws/[email protected] | Bin 0 -> 4568 bytes
docs/spelling_wordlist.txt | 1 +
.../providers/amazon/aws/hooks/test_quicksight.py | 141 +++++++++++++++++++
tests/providers/amazon/aws/hooks/test_sts.py | 25 ++++
.../amazon/aws/operators/test_quicksight.py | 58 ++++++++
.../amazon/aws/sensors/test_quicksight.py | 77 +++++++++++
14 files changed, 834 insertions(+)
diff --git a/airflow/providers/amazon/aws/example_dags/example_quicksight.py
b/airflow/providers/amazon/aws/example_dags/example_quicksight.py
new file mode 100644
index 0000000000..dd46035daa
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_quicksight.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.quicksight import
QuickSightCreateIngestionOperator
+from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor
+
+DATA_SET_ID = os.getenv("DATA_SET_ID", "DemoDataSet_Test")
+INGESTION_NO_WAITING_ID = os.getenv("INGESTION_NO_WAITING_ID",
"DemoDataSet_Ingestion_No_Waiting_Test")
+
+with DAG(
+ dag_id="example_quicksight",
+ schedule_interval=None,
+ start_date=datetime(2021, 1, 1),
+ tags=["example"],
+ catchup=False,
+) as dag:
+ # Create and Start the QuickSight SPICE data ingestion
+ # and does not wait for its completion
+ # [START howto_operator_quicksight_create_ingestion]
+ quicksight_create_ingestion_no_waiting = QuickSightCreateIngestionOperator(
+ data_set_id=DATA_SET_ID,
+ ingestion_id=INGESTION_NO_WAITING_ID,
+ wait_for_completion=False,
+ task_id="sample_quicksight_no_waiting_dag",
+ )
+ # [END howto_operator_quicksight_create_ingestion]
+
+ # The following task checks the status of the QuickSight SPICE ingestion
+ # job until it succeeds.
+ # [START howto_sensor_quicksight]
+ quicksight_job_status = QuickSightSensor(
+ data_set_id=DATA_SET_ID,
+ ingestion_id=INGESTION_NO_WAITING_ID,
+ task_id="check_quicksight_job_status",
+ )
+ # [END howto_sensor_quicksight]
+ quicksight_create_ingestion_no_waiting >> quicksight_job_status
diff --git a/airflow/providers/amazon/aws/hooks/quicksight.py
b/airflow/providers/amazon/aws/hooks/quicksight.py
new file mode 100644
index 0000000000..a7e90c36cf
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/quicksight.py
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import time
+
+from botocore.exceptions import ClientError
+
+from airflow import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+
+if sys.version_info >= (3, 8):
+ from functools import cached_property
+else:
+ from cached_property import cached_property
+
+
+class QuickSightHook(AwsBaseHook):
+ """
+ Interact with Amazon QuickSight.
+
+ Additional arguments (such as ``aws_conn_id``) may be specified and
+ are passed down to the underlying AwsBaseHook.
+ .. seealso::
+ :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+ """
+
+ NON_TERMINAL_STATES = {"INITIALIZED", "QUEUED", "RUNNING"}
+ FAILED_STATES = {"FAILED"}
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(client_type="quicksight", *args, **kwargs)
+
+ @cached_property
+ def sts_hook(self):
+ return StsHook(aws_conn_id=self.aws_conn_id)
+
+ def create_ingestion(
+ self,
+ data_set_id: str,
+ ingestion_id: str,
+ ingestion_type: str,
+ wait_for_completion: bool = True,
+ check_interval: int = 30,
+ ):
+ """
+ Creates and starts a new SPICE ingestion for a dataset. Refreshes the
SPICE datasets
+
+ :param data_set_id: ID of the dataset used in the ingestion.
+ :param ingestion_id: ID for the ingestion.
+ :param ingestion_type: Type of ingestion .
"INCREMENTAL_REFRESH"|"FULL_REFRESH"
+ :param wait_for_completion: if the program should keep running until
job finishes
+ :param check_interval: the time interval in seconds which the operator
+ will check the status of QuickSight Ingestion
+ :return: Returns descriptive information about the created data
ingestion
+ having Ingestion ARN, HTTP status, ingestion ID and ingestion
status.
+ :rtype: Dict
+ """
+
+ self.log.info("Creating QuickSight Ingestion for data set id %s.",
data_set_id)
+ quicksight_client = self.get_conn()
+ try:
+ aws_account_id = self.sts_hook.get_account_number()
+ create_ingestion_response = quicksight_client.create_ingestion(
+ DataSetId=data_set_id,
+ IngestionId=ingestion_id,
+ IngestionType=ingestion_type,
+ AwsAccountId=aws_account_id,
+ )
+
+ if wait_for_completion:
+ self.wait_for_state(
+ aws_account_id=aws_account_id,
+ data_set_id=data_set_id,
+ ingestion_id=ingestion_id,
+ target_state={"COMPLETED"},
+ check_interval=check_interval,
+ )
+ return create_ingestion_response
+ except Exception as general_error:
+ self.log.error("Failed to run Amazon QuickSight create_ingestion
API, error: %s", general_error)
+ raise
+
+ def get_status(self, aws_account_id: str, data_set_id: str, ingestion_id:
str):
+ """
+ Get the current status of QuickSight Create Ingestion API.
+
+ :param aws_account_id: An AWS Account ID
+ :param data_set_id: QuickSight Data Set ID
+ :param ingestion_id: QuickSight Ingestion ID
+ :return: An QuickSight Ingestion Status
+ :rtype: str
+ """
+ try:
+ describe_ingestion_response = self.get_conn().describe_ingestion(
+ AwsAccountId=aws_account_id, DataSetId=data_set_id,
IngestionId=ingestion_id
+ )
+ return describe_ingestion_response["Ingestion"]["IngestionStatus"]
+ except KeyError:
+ raise AirflowException("Could not get status of the Amazon
QuickSight Ingestion")
+ except ClientError:
+ raise AirflowException("AWS request failed, check logs for more
info")
+
+ def wait_for_state(
+ self,
+ aws_account_id: str,
+ data_set_id: str,
+ ingestion_id: str,
+ target_state: set,
+ check_interval: int,
+ ):
+ """
+ Check status of a QuickSight Create Ingestion API
+
+ :param aws_account_id: An AWS Account ID
+ :param data_set_id: QuickSight Data Set ID
+ :param ingestion_id: QuickSight Ingestion ID
+ :param target_state: Describes the QuickSight Job's Target State
+ :param check_interval: the time interval in seconds which the operator
+ will check the status of QuickSight Ingestion
+ :return: response of describe_ingestion call after Ingestion is is done
+ """
+
+ sec = 0
+ status = self.get_status(aws_account_id, data_set_id, ingestion_id)
+ while status in self.NON_TERMINAL_STATES and status != target_state:
+ self.log.info("Current status is %s", status)
+ time.sleep(check_interval)
+ sec += check_interval
+ if status in self.FAILED_STATES:
+ raise AirflowException("The Amazon QuickSight Ingestion
failed!")
+ if status == "CANCELLED":
+ raise AirflowException("The Amazon QuickSight SPICE ingestion
cancelled!")
+ status = self.get_status(aws_account_id, data_set_id, ingestion_id)
+
+ self.log.info("QuickSight Ingestion completed")
+ return status
diff --git a/airflow/providers/amazon/aws/hooks/sts.py
b/airflow/providers/amazon/aws/hooks/sts.py
new file mode 100644
index 0000000000..aff787ee5d
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/sts.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class StsHook(AwsBaseHook):
+ """
+ Interact with AWS Security Token Service (STS)
+
+ Additional arguments (such as ``aws_conn_id``) may be specified and
+ are passed down to the underlying AwsBaseHook.
+ .. seealso::
+ :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+ """
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(client_type="sts", *args, **kwargs)
+
+ def get_account_number(self) -> str:
+ """Get the account Number"""
+
+ try:
+ return self.get_conn().get_caller_identity()['Account']
+ except Exception as general_error:
+ self.log.error("Failed to get the AWS Account Number, error: %s",
general_error)
+ raise
diff --git a/airflow/providers/amazon/aws/operators/quicksight.py
b/airflow/providers/amazon/aws/operators/quicksight.py
new file mode 100644
index 0000000000..a9da61bdfc
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/quicksight.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Optional, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+DEFAULT_CONN_ID = "aws_default"
+
+
+class QuickSightCreateIngestionOperator(BaseOperator):
+ """
+ Creates and starts a new SPICE ingestion for a dataset.
+ Also, helps to Refresh existing SPICE datasets.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:QuickSightCreateIngestionOperator`
+
+ :param data_set_id: ID of the dataset used in the ingestion.
+ :param ingestion_id: ID for the ingestion.
+ :param ingestion_type: Type of ingestion. Values Can be
INCREMENTAL_REFRESH or FULL_REFRESH.
+ Default FULL_REFRESH.
+ :param wait_for_completion: If wait is set to True, the time interval, in
seconds,
+ that the operation waits to check the status of the Amazon QuickSight
Ingestion.
+ :param check_interval: if wait is set to be true, this is the time interval
+ in seconds which the operator will check the status of the Amazon
QuickSight Ingestion
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
(templated)
+ If this is None or empty then the default boto3 behaviour is used. If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then the default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region: Which AWS region the connection should use. (templated)
+ If this is None or empty then the default boto3 behaviour is used.
+ """
+
+ template_fields: Sequence[str] = (
+ "data_set_id",
+ "ingestion_id",
+ "ingestion_type",
+ "wait_for_completion",
+ "check_interval",
+ "aws_conn_id",
+ "region",
+ )
+ ui_color = "#ffd700"
+
+ def __init__(
+ self,
+ data_set_id: str,
+ ingestion_id: str,
+ ingestion_type: str = "FULL_REFRESH",
+ wait_for_completion: bool = True,
+ check_interval: int = 30,
+ aws_conn_id: str = DEFAULT_CONN_ID,
+ region: Optional[str] = None,
+ **kwargs,
+ ):
+ self.data_set_id = data_set_id
+ self.ingestion_id = ingestion_id
+ self.ingestion_type = ingestion_type
+ self.wait_for_completion = wait_for_completion
+ self.check_interval = check_interval
+ self.aws_conn_id = aws_conn_id
+ self.region = region
+ super().__init__(**kwargs)
+
+ def execute(self, context: "Context"):
+ hook = QuickSightHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region,
+ )
+ self.log.info("Running the Amazon QuickSight SPICE Ingestion on
Dataset ID: %s)", self.data_set_id)
+ return hook.create_ingestion(
+ data_set_id=self.data_set_id,
+ ingestion_id=self.ingestion_id,
+ ingestion_type=self.ingestion_type,
+ wait_for_completion=self.wait_for_completion,
+ check_interval=self.check_interval,
+ )
diff --git a/airflow/providers/amazon/aws/sensors/quicksight.py
b/airflow/providers/amazon/aws/sensors/quicksight.py
new file mode 100644
index 0000000000..da94980e80
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/quicksight.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+from typing import TYPE_CHECKING, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+if sys.version_info >= (3, 8):
+ from functools import cached_property
+else:
+ from cached_property import cached_property
+
+
+class QuickSightSensor(BaseSensorOperator):
+ """
+ Watches for the status of an Amazon QuickSight Ingestion.
+
+ .. seealso::
+ For more information on how to use this sensor, take a look at the
guide:
+ :ref:`howto/sensor:QuickSightSensor`
+
+ :param data_set_id: ID of the dataset used in the ingestion.
+ :param ingestion_id: ID for the ingestion.
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
(templated)
+ If this is None or empty then the default boto3 behaviour is used. If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then the default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ """
+
+ def __init__(
+ self,
+ *,
+ data_set_id: str,
+ ingestion_id: str,
+ aws_conn_id: str = "aws_default",
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.data_set_id = data_set_id
+ self.ingestion_id = ingestion_id
+ self.aws_conn_id = aws_conn_id
+ self.success_status = "COMPLETED"
+ self.errored_statuses = ("FAILED", "CANCELLED")
+ self.quicksight_hook: Optional[QuickSightHook] = None
+ self.sts_hook: Optional[StsHook] = None
+
+ def poke(self, context: "Context"):
+ """
+ Pokes until the QuickSight Ingestion has successfully finished.
+
+ :param context: The task context during execution.
+ :return: True if it COMPLETED and False if not.
+ :rtype: bool
+ """
+ quicksight_hook = self.get_quicksight_hook
+ sts_hook = self.get_sts_hook
+ self.log.info("Poking for Amazon QuickSight Ingestion ID: %s",
self.ingestion_id)
+ aws_account_id = sts_hook.get_account_number()
+ quicksight_ingestion_state = quicksight_hook.get_status(
+ aws_account_id, self.data_set_id, self.ingestion_id
+ )
+ self.log.info("QuickSight Status: %s", quicksight_ingestion_state)
+ if quicksight_ingestion_state in self.errored_statuses:
+ raise AirflowException("The QuickSight Ingestion failed!")
+ return quicksight_ingestion_state == self.success_status
+
+ @cached_property
+ def get_quicksight_hook(self):
+ return QuickSightHook(aws_conn_id=self.aws_conn_id)
+
+ @cached_property
+ def get_sts_hook(self):
+ return StsHook(aws_conn_id=self.aws_conn_id)
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 709667a80a..fdb218db1e 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -201,6 +201,16 @@ integrations:
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/dms.rst
tags: [aws]
+ - integration-name: Amazon QuickSight
+ external-doc-url: https://aws.amazon.com/quicksight/
+ logo: /integration-logos/aws/[email protected]
+ how-to-guide:
+ - /docs/apache-airflow-providers-amazon/operators/quicksight.rst
+ tags: [aws]
+ - integration-name: AWS Security Token Service (STS)
+ external-doc-url:
https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
+ logo: /integration-logos/aws/[email protected]
+ tags: [aws]
operators:
- integration-name: Amazon Athena
@@ -296,6 +306,9 @@ operators:
- airflow.providers.amazon.aws.operators.redshift_sql
- airflow.providers.amazon.aws.operators.redshift_cluster
- airflow.providers.amazon.aws.operators.redshift_data
+ - integration-name: Amazon QuickSight
+ python-modules:
+ - airflow.providers.amazon.aws.operators.quicksight
sensors:
- integration-name: Amazon Athena
@@ -364,6 +377,9 @@ sensors:
python-modules:
- airflow.providers.amazon.aws.sensors.step_function_execution
- airflow.providers.amazon.aws.sensors.step_function
+ - integration-name: Amazon QuickSight
+ python-modules:
+ - airflow.providers.amazon.aws.sensors.quicksight
hooks:
- integration-name: Amazon Athena
@@ -451,6 +467,12 @@ hooks:
- integration-name: AWS Step Functions
python-modules:
- airflow.providers.amazon.aws.hooks.step_function
+ - integration-name: Amazon QuickSight
+ python-modules:
+ - airflow.providers.amazon.aws.hooks.quicksight
+ - integration-name: AWS Security Token Service (STS)
+ python-modules:
+ - airflow.providers.amazon.aws.hooks.sts
transfers:
- source-integration-name: Amazon DynamoDB
diff --git a/docs/apache-airflow-providers-amazon/operators/quicksight.rst
b/docs/apache-airflow-providers-amazon/operators/quicksight.rst
new file mode 100644
index 0000000000..4a65055b67
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/quicksight.rst
@@ -0,0 +1,67 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Amazon QuickSight Operators
+========================================
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+Overview
+--------
+
+Airflow to Amazon QuickSight integration allows users to create and start the
SPICE ingestion for dataset.
+
+ -
:class:`~airflow.providers.amazon.aws.operators.quicksight.QuickSightCreateIngestionOperator`
+ - :class:`~airflow.providers.amazon.aws.sensor.quicksight.QuickSightSensor`
+
+.. _howto/operator:QuickSightCreateIngestionOperator:
+
+Amazon QuickSight CreateIngestion Operator
+"""""""""""""""""""""""""""""""""""""""""""
+
+The QuickSightCreateIngestionOperator Creates and starts a new SPICE ingestion
for a dataset.
+The operator also refreshes existing SPICE datasets
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_quicksight.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_quicksight_create_ingestion]
+ :end-before: [END howto_operator_quicksight_create_ingestion]
+
+.. _howto/sensor:QuickSightSensor:
+
+Amazon QuickSight Sensor
+""""""""""""""""""""""""
+
+The QuickSightSensor wait for Amazon QuickSight CreateIngestion until it
reaches a terminal state
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_quicksight.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_quicksight]
+ :end-before: [END howto_sensor_quicksight]
+
+Reference
+---------
+
+For further information, look at:
+
+* `Boto3 Library Documentation for QuickSight
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/quicksight.html>`__
diff --git a/docs/integration-logos/aws/[email protected]
b/docs/integration-logos/aws/[email protected]
new file mode 100644
index 0000000000..5f0cecfd36
Binary files /dev/null and b/docs/integration-logos/aws/[email protected]
differ
diff --git a/docs/integration-logos/aws/[email protected]
b/docs/integration-logos/aws/[email protected]
new file mode 100644
index 0000000000..55007bdfab
Binary files /dev/null and
b/docs/integration-logos/aws/[email protected] differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 3672c4ac2f..703c713644 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -934,6 +934,7 @@ infile
influxdb
infoType
infoTypes
+ingestions
ini
init
initcontainer
diff --git a/tests/providers/amazon/aws/hooks/test_quicksight.py
b/tests/providers/amazon/aws/hooks/test_quicksight.py
new file mode 100644
index 0000000000..ddd712e17b
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_quicksight.py
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pytest
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+
+AWS_ACCOUNT_ID = "123456789012"
+
+
+MOCK_DATA = {
+ "DataSetId": "DemoDataSet",
+ "IngestionId": "DemoDataSet_Ingestion",
+ "IngestionType": "INCREMENTAL_REFRESH",
+ "AwsAccountId": AWS_ACCOUNT_ID,
+}
+
+MOCK_CREATE_INGESTION_RESPONSE = {
+ "Status": 201,
+ "Arn":
"arn:aws:quicksight:us-east-1:123456789012:dataset/DemoDataSet/ingestion/DemoDataSet3_Ingestion",
+ "IngestionId": "DemoDataSet_Ingestion",
+ "IngestionStatus": "INITIALIZED",
+ "RequestId": "fc1f7eea-1327-41d6-9af7-c12f097ed343",
+}
+
+MOCK_DESCRIBE_INGESTION_SUCCESS = {
+ "Status": 200,
+ "Ingestion": {
+ "Arn":
"arn:aws:quicksight:region:123456789012:dataset/DemoDataSet/ingestion/DemoDataSet3_Ingestion",
+ "IngestionId": "DemoDataSet_Ingestion",
+ "IngestionStatus": "COMPLETED",
+ "ErrorInfo": {},
+ "RowInfo": {"RowsIngested": 228, "RowsDropped": 0,
"TotalRowsInDataset": 228},
+ "CreatedTime": 1646589017.05,
+ "IngestionTimeInSeconds": 17,
+ "IngestionSizeInBytes": 27921,
+ "RequestSource": "MANUAL",
+ "RequestType": "FULL_REFRESH",
+ },
+ "RequestId": "DemoDataSet_Ingestion_Request_ID",
+}
+
+MOCK_DESCRIBE_INGESTION_FAILURE = {
+ "Status": 403,
+ "Ingestion": {
+ "Arn":
"arn:aws:quicksight:region:123456789012:dataset/DemoDataSet/ingestion/DemoDataSet3_Ingestion",
+ "IngestionId": "DemoDataSet_Ingestion",
+ "IngestionStatus": "Failed",
+ "ErrorInfo": {},
+ "RowInfo": {"RowsIngested": 228, "RowsDropped": 0,
"TotalRowsInDataset": 228},
+ "CreatedTime": 1646589017.05,
+ "IngestionTimeInSeconds": 17,
+ "IngestionSizeInBytes": 27921,
+ "RequestSource": "MANUAL",
+ "RequestType": "FULL_REFRESH",
+ },
+ "RequestId": "DemoDataSet_Ingestion_Request_ID",
+}
+
+
+class TestQuicksight:
+ def test_get_conn_returns_a_boto3_connection(self):
+ hook = QuickSightHook(aws_conn_id="aws_default",
region_name="us-east-1")
+ assert hook.conn is not None
+
+ @mock.patch.object(QuickSightHook, "get_conn")
+ @mock.patch.object(StsHook, "get_conn")
+ @mock.patch.object(StsHook, "get_account_number")
+ def test_create_ingestion(self, mock_get_account_number, sts_conn,
mock_conn):
+ mock_conn.return_value.create_ingestion.return_value =
MOCK_CREATE_INGESTION_RESPONSE
+ mock_get_account_number.return_value = AWS_ACCOUNT_ID
+ quicksight_hook = QuickSightHook(aws_conn_id="aws_default",
region_name="us-east-1")
+ result = quicksight_hook.create_ingestion(
+ data_set_id="DemoDataSet",
+ ingestion_id="DemoDataSet_Ingestion",
+ ingestion_type="INCREMENTAL_REFRESH",
+ )
+ expected_call_params = MOCK_DATA
+
mock_conn.return_value.create_ingestion.assert_called_with(**expected_call_params)
+ assert result == MOCK_CREATE_INGESTION_RESPONSE
+
+ @mock.patch.object(StsHook, "get_conn")
+ @mock.patch.object(StsHook, "get_account_number")
+ def test_create_ingestion_exception(self, mock_get_account_number,
sts_conn):
+ mock_get_account_number.return_value = AWS_ACCOUNT_ID
+ hook = QuickSightHook(aws_conn_id="aws_default")
+ with pytest.raises(ClientError) as raised_exception:
+ hook.create_ingestion(
+ data_set_id="DemoDataSet",
+ ingestion_id="DemoDataSet_Ingestion",
+ ingestion_type="INCREMENTAL_REFRESH",
+ )
+ ex = raised_exception.value
+ assert ex.operation_name == "CreateIngestion"
+
+ @mock.patch.object(QuickSightHook, "get_conn")
+ def test_get_job_status(self, mock_conn):
+ """
+ Test get job status
+ """
+ mock_conn.return_value.describe_ingestion.return_value =
MOCK_DESCRIBE_INGESTION_SUCCESS
+ quicksight_hook = QuickSightHook(aws_conn_id="aws_default",
region_name="us-east-1")
+ result = quicksight_hook.get_status(
+ data_set_id="DemoDataSet",
+ ingestion_id="DemoDataSet_Ingestion",
+ aws_account_id="123456789012",
+ )
+ assert result == "COMPLETED"
+
+ @mock.patch.object(QuickSightHook, "get_conn")
+ def test_get_job_status_failed(self, mock_conn):
+ """
+ Test get job status
+ """
+ mock_conn.return_value.describe_ingestion.return_value =
MOCK_DESCRIBE_INGESTION_FAILURE
+ quicksight_hook = QuickSightHook(aws_conn_id="aws_default",
region_name="us-east-1")
+ result = quicksight_hook.get_status(
+ data_set_id="DemoDataSet",
+ ingestion_id="DemoDataSet_Ingestion",
+ aws_account_id="123456789012",
+ )
+ assert result == "Failed"
diff --git a/tests/providers/amazon/aws/hooks/test_sts.py
b/tests/providers/amazon/aws/hooks/test_sts.py
new file mode 100644
index 0000000000..b33ec5082b
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_sts.py
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+
+
+class TestSTS:
+ def test_get_conn_returns_a_boto3_connection(self):
+ hook = StsHook(aws_conn_id="aws_default", region_name="us-east-1")
+ assert hook.conn is not None
diff --git a/tests/providers/amazon/aws/operators/test_quicksight.py
b/tests/providers/amazon/aws/operators/test_quicksight.py
new file mode 100644
index 0000000000..f1f1568765
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_quicksight.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+from airflow.providers.amazon.aws.operators.quicksight import
QuickSightCreateIngestionOperator
+
+DATA_SET_ID = "DemoDataSet"
+INGESTION_ID = "DemoDataSet_Ingestion"
+AWS_ACCOUNT_ID = "123456789012"
+INGESTION_TYPE = "FULL_REFRESH"
+
+MOCK_RESPONSE = {
+ "Status": 201,
+ "Arn":
"arn:aws:quicksight:us-east-1:123456789012:dataset/DemoDataSet/ingestion/DemoDataSet_Ingestion",
+ "IngestionId": "DemoDataSet_Ingestion",
+ "IngestionStatus": "INITIALIZED",
+ "RequestId": "fc1f7eea-1327-41d6-9af7-c12f097ed343",
+}
+
+
+class TestQuickSightCreateIngestionOperator(unittest.TestCase):
+ def setUp(self):
+ self.quicksight = QuickSightCreateIngestionOperator(
+ task_id="test_quicksight_operator",
+ data_set_id=DATA_SET_ID,
+ ingestion_id=INGESTION_ID,
+ )
+
+ @mock.patch.object(QuickSightHook, "get_conn")
+ @mock.patch.object(QuickSightHook, "create_ingestion")
+ def test_execute(self, mock_create_ingestion, mock_client):
+ mock_create_ingestion.return_value = MOCK_RESPONSE
+ self.quicksight.execute(None)
+ mock_create_ingestion.assert_called_once_with(
+ data_set_id=DATA_SET_ID,
+ ingestion_id=INGESTION_ID,
+ ingestion_type="FULL_REFRESH",
+ wait_for_completion=True,
+ check_interval=30,
+ )
diff --git a/tests/providers/amazon/aws/sensors/test_quicksight.py
b/tests/providers/amazon/aws/sensors/test_quicksight.py
new file mode 100644
index 0000000000..280734cf34
--- /dev/null
+++ b/tests/providers/amazon/aws/sensors/test_quicksight.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.quicksight import QuickSightHook
+from airflow.providers.amazon.aws.hooks.sts import StsHook
+from airflow.providers.amazon.aws.sensors.quicksight import QuickSightSensor
+
+AWS_ACCOUNT_ID = "123456789012"
+DATA_SET_ID = "DemoDataSet"
+INGESTION_ID = "DemoDataSet_Ingestion"
+
+
+class TestQuickSightSensor(unittest.TestCase):
+ def setUp(self):
+ self.sensor = QuickSightSensor(
+ task_id="test_quicksight_sensor",
+ aws_conn_id="aws_default",
+ data_set_id="DemoDataSet",
+ ingestion_id="DemoDataSet_Ingestion",
+ )
+
+ @mock.patch.object(QuickSightHook, "get_status")
+ @mock.patch.object(StsHook, "get_conn")
+ @mock.patch.object(StsHook, "get_account_number")
+ def test_poke_success(self, mock_get_account_number, sts_conn,
mock_get_status):
+ mock_get_account_number.return_value = AWS_ACCOUNT_ID
+ mock_get_status.return_value = "COMPLETED"
+ self.assertTrue(self.sensor.poke({}))
+ mock_get_status.assert_called_once_with(AWS_ACCOUNT_ID, DATA_SET_ID,
INGESTION_ID)
+
+ @mock.patch.object(QuickSightHook, "get_status")
+ @mock.patch.object(StsHook, "get_conn")
+ @mock.patch.object(StsHook, "get_account_number")
+ def test_poke_cancelled(self, mock_get_account_number, sts_conn,
mock_get_status):
+ mock_get_account_number.return_value = AWS_ACCOUNT_ID
+ mock_get_status.return_value = "CANCELLED"
+ with self.assertRaises(AirflowException):
+ self.sensor.poke({})
+ mock_get_status.assert_called_once_with(AWS_ACCOUNT_ID, DATA_SET_ID,
INGESTION_ID)
+
+ @mock.patch.object(QuickSightHook, "get_status")
+ @mock.patch.object(StsHook, "get_conn")
+ @mock.patch.object(StsHook, "get_account_number")
+ def test_poke_failed(self, mock_get_account_number, sts_conn,
mock_get_status):
+ mock_get_account_number.return_value = AWS_ACCOUNT_ID
+ mock_get_status.return_value = "FAILED"
+ with self.assertRaises(AirflowException):
+ self.sensor.poke({})
+ mock_get_status.assert_called_once_with(AWS_ACCOUNT_ID, DATA_SET_ID,
INGESTION_ID)
+
+ @mock.patch.object(QuickSightHook, "get_status")
+ @mock.patch.object(StsHook, "get_conn")
+ @mock.patch.object(StsHook, "get_account_number")
+ def test_poke_initialized(self, mock_get_account_number, sts_conn,
mock_get_status):
+ mock_get_account_number.return_value = AWS_ACCOUNT_ID
+ mock_get_status.return_value = "INITIALIZED"
+ self.assertFalse(self.sensor.poke({}))
+ mock_get_status.assert_called_once_with(AWS_ACCOUNT_ID, DATA_SET_ID,
INGESTION_ID)