This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 46af5baba8 Update the Athena Sample DAG and Docs (#23428)
46af5baba8 is described below
commit 46af5baba810a07eec395e89db08fc5dab175e23
Author: D. Ferruzzi <[email protected]>
AuthorDate: Fri May 6 23:28:44 2022 -0700
Update the Athena Sample DAG and Docs (#23428)
* Update the Athena Sample DAG and Docs
---
.../amazon/aws/example_dags/example_athena.py | 91 ++++++++--------------
airflow/providers/amazon/aws/sensors/athena.py | 5 ++
.../operators/athena.rst | 66 ++++++++++------
3 files changed, 80 insertions(+), 82 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_athena.py
b/airflow/providers/amazon/aws/example_dags/example_athena.py
index 925d3598a7..fe248b2fb3 100644
--- a/airflow/providers/amazon/aws/example_dags/example_athena.py
+++ b/airflow/providers/amazon/aws/example_dags/example_athena.py
@@ -15,21 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import datetime, timedelta
+from datetime import datetime
from os import getenv
from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
+from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator,
S3DeleteObjectsOperator
from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
-# [START howto_operator_athena_env_variables]
S3_BUCKET = getenv("S3_BUCKET", "test-bucket")
-S3_KEY = getenv("S3_KEY", "key")
-ATHENA_TABLE = getenv("ATHENA_TABLE", "test_table")
-ATHENA_DATABASE = getenv("ATHENA_DATABASE", "default")
-# [END howto_operator_athena_env_variables]
+S3_KEY = getenv('S3_KEY', 'athena-demo')
+ATHENA_TABLE = getenv('ATHENA_TABLE', 'test_table')
+ATHENA_DATABASE = getenv('ATHENA_DATABASE', 'default')
SAMPLE_DATA = """"Alice",20
"Bob",25
@@ -38,28 +37,12 @@ SAMPLE_DATA = """"Alice",20
SAMPLE_FILENAME = 'airflow_sample.csv'
-@task(task_id='setup__add_sample_data_to_s3')
-def add_sample_data_to_s3():
- s3_hook = S3Hook()
- s3_hook.load_string(SAMPLE_DATA,
f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}', S3_BUCKET, replace=True)
-
-
-@task(task_id='teardown__remove_sample_data_from_s3')
-def remove_sample_data_from_s3():
- s3_hook = S3Hook()
- if s3_hook.check_for_key(f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}',
S3_BUCKET):
- s3_hook.delete_objects(S3_BUCKET,
f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}')
-
-
-@task(task_id='query__read_results_from_s3')
+@task
def read_results_from_s3(query_execution_id):
s3_hook = S3Hook()
- if s3_hook.check_for_key(f'{S3_KEY}/{query_execution_id}.csv', S3_BUCKET):
- file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET,
Key=f'{S3_KEY}/{query_execution_id}.csv')
- file_content = file_obj['Body'].read().decode('utf-8')
- print(file_content)
- else:
- print('Could not find QueryExecutionId:', query_execution_id)
+ file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET,
Key=f'{S3_KEY}/{query_execution_id}.csv')
+ file_content = file_obj['Body'].read().decode('utf-8')
+ print(file_content)
QUERY_CREATE_TABLE = f"""
@@ -82,66 +65,60 @@ with DAG(
dag_id='example_athena',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
- dagrun_timeout=timedelta(minutes=60),
tags=['example'],
catchup=False,
) as dag:
- # [START howto_athena_operator_and_sensor]
- # Using a task-decorated function to create a CSV file in S3
- add_sample_data_to_s3 = add_sample_data_to_s3()
+ upload_sample_data = S3CreateObjectOperator(
+ task_id='upload_sample_data',
+ s3_bucket=S3_BUCKET,
+ s3_key=f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}',
+ data=SAMPLE_DATA,
+ replace=True,
+ )
create_table = AthenaOperator(
- task_id='setup__create_table',
+ task_id='create_table',
query=QUERY_CREATE_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
- sleep_time=30,
- max_tries=None,
)
+ # [START howto_athena_operator]
read_table = AthenaOperator(
- task_id='query__read_table',
+ task_id='read_table',
query=QUERY_READ_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
- sleep_time=30,
- max_tries=None,
)
+ # [END howto_athena_operator]
- get_read_state = AthenaSensor(
- task_id='query__get_read_state',
+ # [START howto_athena_sensor]
+ await_query = AthenaSensor(
+ task_id='await_query',
query_execution_id=read_table.output,
- max_retries=None,
- sleep_time=10,
)
-
- # Using a task-decorated function to read the results from S3
- read_results_from_s3 = read_results_from_s3(read_table.output)
+ # [END howto_athena_sensor]
drop_table = AthenaOperator(
- task_id='teardown__drop_table',
+ task_id='drop_table',
query=QUERY_DROP_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
- sleep_time=30,
- max_tries=None,
)
- # Using a task-decorated function to delete the S3 file we created earlier
- remove_sample_data_from_s3 = remove_sample_data_from_s3()
+ remove_s3_files = S3DeleteObjectsOperator(
+ task_id='remove_s3_files',
+ bucket=S3_BUCKET,
+ prefix=S3_KEY,
+ )
(
- add_sample_data_to_s3 # type: ignore
+ upload_sample_data
>> create_table
>> read_table
- >> get_read_state
- >> read_results_from_s3
+ >> await_query
+ >> read_results_from_s3(read_table.output)
>> drop_table
- >> remove_sample_data_from_s3
+ >> remove_s3_files
)
- # [END howto_athena_operator_and_sensor]
-
- # Task dependencies created via `XComArgs`:
- # read_table >> get_read_state
- # read_table >> read_results_from_s3
diff --git a/airflow/providers/amazon/aws/sensors/athena.py
b/airflow/providers/amazon/aws/sensors/athena.py
index 64e9279b4b..c94053ead6 100644
--- a/airflow/providers/amazon/aws/sensors/athena.py
+++ b/airflow/providers/amazon/aws/sensors/athena.py
@@ -36,6 +36,11 @@ class AthenaSensor(BaseSensorOperator):
Asks for the state of the Query until it reaches a failure state or
success state.
If the query fails, the task will fail.
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AthenaSensor`
+
+
:param query_execution_id: query_execution_id to check the state of
:param max_retries: Number of times to poll for query state before
returning the current state, defaults to None
diff --git a/docs/apache-airflow-providers-amazon/operators/athena.rst
b/docs/apache-airflow-providers-amazon/operators/athena.rst
index 47e62d160e..ea96d8127d 100644
--- a/docs/apache-airflow-providers-amazon/operators/athena.rst
+++ b/docs/apache-airflow-providers-amazon/operators/athena.rst
@@ -16,42 +16,58 @@
under the License.
-.. _howto/operator:AthenaOperator:
-
-Amazon Athena Operator
-======================
+Amazon Athena Operators
+=======================
-.. contents::
- :depth: 1
- :local:
+`Amazon Athena <https://aws.amazon.com/athena/>`__ is an interactive query
service
+that makes it easy to analyze data in Amazon Simple Storage Service (S3) using
+standard SQL. Athena is serverless, so there is no infrastructure to setup or
+manage, and you pay only for the queries you run. To get started, simply point
+to your data in S3, define the schema, and start querying using standard SQL.
Prerequisite Tasks
-------------------
+^^^^^^^^^^^^^^^^^^
.. include:: _partials/prerequisite_tasks.rst
-Using Operator
---------------
-Use the
-:class:`~airflow.providers.amazon.aws.operators.athena.AthenaOperator`
-to run a query in Amazon Athena. To get started with Amazon Athena please
visit
-`aws.amazon.com/athena <https://aws.amazon.com/athena>`_
+.. _howto/operator:AthenaOperator:
+
+Athena Operator
+^^^^^^^^^^^^^^^
-In the following example, we create an Athena table and run a query based upon
a CSV file
-created in an S3 bucket and populated with SAMPLE_DATA. The example waits for
the query
-to complete and then drops the created table and deletes the sample CSV file
in the S3
-bucket.
+Use the :class:`~airflow.providers.amazon.aws.operators.athena.AthenaOperator`
+to run a query in Amazon Athena.
+
+
+In the following example, we query an existing Athena table and send the
results to
+an existing Amazon S3 bucket. For more examples of how to use this operator,
please
+see the `Sample DAG
<https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/example_dags/example_athena.py>`__.
.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_athena.py
:language: python
- :start-after: [START howto_athena_operator_and_sensor]
- :end-before: [END howto_athena_operator_and_sensor]
+ :start-after: [START howto_athena_operator]
+ :dedent: 4
+ :end-before: [END howto_athena_operator]
+
+.. _howto/operator:AthenaSensor:
+
+Athena Sensor
+^^^^^^^^^^^^^
+
+Use the :class:`~airflow.providers.amazon.aws.sensors.athena.AthenaSensor`
+to wait for the results of a query in Amazon Athena.
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_athena.py
+ :language: python
+ :start-after: [START howto_athena_sensor]
+ :dedent: 4
+ :end-before: [END howto_athena_sensor]
+
-More information
-----------------
+Reference
+^^^^^^^^^
-For further information, look at the documentation of
:meth:`~Athena.Client.start_query_execution` method
-in `boto3`_.
+For further information, look at:
-.. _boto3: https://pypi.org/project/boto3/
+* `Boto3 Library Documentation for Athena
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html>`__