This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 46af5baba8 Update the Athena Sample DAG and Docs (#23428)
46af5baba8 is described below

commit 46af5baba810a07eec395e89db08fc5dab175e23
Author: D. Ferruzzi <[email protected]>
AuthorDate: Fri May 6 23:28:44 2022 -0700

    Update the Athena Sample DAG and Docs (#23428)
    
    * Update the Athena Sample DAG and Docs
---
 .../amazon/aws/example_dags/example_athena.py      | 91 ++++++++--------------
 airflow/providers/amazon/aws/sensors/athena.py     |  5 ++
 .../operators/athena.rst                           | 66 ++++++++++------
 3 files changed, 80 insertions(+), 82 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_athena.py 
b/airflow/providers/amazon/aws/example_dags/example_athena.py
index 925d3598a7..fe248b2fb3 100644
--- a/airflow/providers/amazon/aws/example_dags/example_athena.py
+++ b/airflow/providers/amazon/aws/example_dags/example_athena.py
@@ -15,21 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datetime import datetime, timedelta
+from datetime import datetime
 from os import getenv
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.operators.athena import AthenaOperator
+from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator, 
S3DeleteObjectsOperator
 from airflow.providers.amazon.aws.sensors.athena import AthenaSensor
 
-# [START howto_operator_athena_env_variables]
 S3_BUCKET = getenv("S3_BUCKET", "test-bucket")
-S3_KEY = getenv("S3_KEY", "key")
-ATHENA_TABLE = getenv("ATHENA_TABLE", "test_table")
-ATHENA_DATABASE = getenv("ATHENA_DATABASE", "default")
-# [END howto_operator_athena_env_variables]
+S3_KEY = getenv('S3_KEY', 'athena-demo')
+ATHENA_TABLE = getenv('ATHENA_TABLE', 'test_table')
+ATHENA_DATABASE = getenv('ATHENA_DATABASE', 'default')
 
 SAMPLE_DATA = """"Alice",20
 "Bob",25
@@ -38,28 +37,12 @@ SAMPLE_DATA = """"Alice",20
 SAMPLE_FILENAME = 'airflow_sample.csv'
 
 
-@task(task_id='setup__add_sample_data_to_s3')
-def add_sample_data_to_s3():
-    s3_hook = S3Hook()
-    s3_hook.load_string(SAMPLE_DATA, 
f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}', S3_BUCKET, replace=True)
-
-
-@task(task_id='teardown__remove_sample_data_from_s3')
-def remove_sample_data_from_s3():
-    s3_hook = S3Hook()
-    if s3_hook.check_for_key(f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}', 
S3_BUCKET):
-        s3_hook.delete_objects(S3_BUCKET, 
f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}')
-
-
-@task(task_id='query__read_results_from_s3')
+@task
 def read_results_from_s3(query_execution_id):
     s3_hook = S3Hook()
-    if s3_hook.check_for_key(f'{S3_KEY}/{query_execution_id}.csv', S3_BUCKET):
-        file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET, 
Key=f'{S3_KEY}/{query_execution_id}.csv')
-        file_content = file_obj['Body'].read().decode('utf-8')
-        print(file_content)
-    else:
-        print('Could not find QueryExecutionId:', query_execution_id)
+    file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET, 
Key=f'{S3_KEY}/{query_execution_id}.csv')
+    file_content = file_obj['Body'].read().decode('utf-8')
+    print(file_content)
 
 
 QUERY_CREATE_TABLE = f"""
@@ -82,66 +65,60 @@ with DAG(
     dag_id='example_athena',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
-    dagrun_timeout=timedelta(minutes=60),
     tags=['example'],
     catchup=False,
 ) as dag:
-    # [START howto_athena_operator_and_sensor]
 
-    # Using a task-decorated function to create a CSV file in S3
-    add_sample_data_to_s3 = add_sample_data_to_s3()
+    upload_sample_data = S3CreateObjectOperator(
+        task_id='upload_sample_data',
+        s3_bucket=S3_BUCKET,
+        s3_key=f'{S3_KEY}/{ATHENA_TABLE}/{SAMPLE_FILENAME}',
+        data=SAMPLE_DATA,
+        replace=True,
+    )
 
     create_table = AthenaOperator(
-        task_id='setup__create_table',
+        task_id='create_table',
         query=QUERY_CREATE_TABLE,
         database=ATHENA_DATABASE,
         output_location=f's3://{S3_BUCKET}/{S3_KEY}',
-        sleep_time=30,
-        max_tries=None,
     )
 
+    # [START howto_athena_operator]
     read_table = AthenaOperator(
-        task_id='query__read_table',
+        task_id='read_table',
         query=QUERY_READ_TABLE,
         database=ATHENA_DATABASE,
         output_location=f's3://{S3_BUCKET}/{S3_KEY}',
-        sleep_time=30,
-        max_tries=None,
     )
+    # [END howto_athena_operator]
 
-    get_read_state = AthenaSensor(
-        task_id='query__get_read_state',
+    # [START howto_athena_sensor]
+    await_query = AthenaSensor(
+        task_id='await_query',
         query_execution_id=read_table.output,
-        max_retries=None,
-        sleep_time=10,
     )
-
-    # Using a task-decorated function to read the results from S3
-    read_results_from_s3 = read_results_from_s3(read_table.output)
+    # [END howto_athena_sensor]
 
     drop_table = AthenaOperator(
-        task_id='teardown__drop_table',
+        task_id='drop_table',
         query=QUERY_DROP_TABLE,
         database=ATHENA_DATABASE,
         output_location=f's3://{S3_BUCKET}/{S3_KEY}',
-        sleep_time=30,
-        max_tries=None,
     )
 
-    # Using a task-decorated function to delete the S3 file we created earlier
-    remove_sample_data_from_s3 = remove_sample_data_from_s3()
+    remove_s3_files = S3DeleteObjectsOperator(
+        task_id='remove_s3_files',
+        bucket=S3_BUCKET,
+        prefix=S3_KEY,
+    )
 
     (
-        add_sample_data_to_s3  # type: ignore
+        upload_sample_data
         >> create_table
         >> read_table
-        >> get_read_state
-        >> read_results_from_s3
+        >> await_query
+        >> read_results_from_s3(read_table.output)
         >> drop_table
-        >> remove_sample_data_from_s3
+        >> remove_s3_files
     )
-    # [END howto_athena_operator_and_sensor]
-
-    # Task dependencies created via `XComArgs`:
-    #   read_table >> get_read_state
-    #   read_table >> read_results_from_s3
diff --git a/airflow/providers/amazon/aws/sensors/athena.py 
b/airflow/providers/amazon/aws/sensors/athena.py
index 64e9279b4b..c94053ead6 100644
--- a/airflow/providers/amazon/aws/sensors/athena.py
+++ b/airflow/providers/amazon/aws/sensors/athena.py
@@ -36,6 +36,11 @@ class AthenaSensor(BaseSensorOperator):
     Asks for the state of the Query until it reaches a failure state or 
success state.
     If the query fails, the task will fail.
 
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AthenaSensor`
+
+
     :param query_execution_id: query_execution_id to check the state of
     :param max_retries: Number of times to poll for query state before
         returning the current state, defaults to None
diff --git a/docs/apache-airflow-providers-amazon/operators/athena.rst 
b/docs/apache-airflow-providers-amazon/operators/athena.rst
index 47e62d160e..ea96d8127d 100644
--- a/docs/apache-airflow-providers-amazon/operators/athena.rst
+++ b/docs/apache-airflow-providers-amazon/operators/athena.rst
@@ -16,42 +16,58 @@
     under the License.
 
 
-.. _howto/operator:AthenaOperator:
-
-Amazon Athena Operator
-======================
+Amazon Athena Operators
+=======================
 
-.. contents::
-  :depth: 1
-  :local:
+`Amazon Athena <https://aws.amazon.com/athena/>`__ is an interactive query 
service
+that makes it easy to analyze data in Amazon Simple Storage Service (S3) using
+standard SQL.  Athena is serverless, so there is no infrastructure to setup or
+manage, and you pay only for the queries you run.  To get started, simply point
+to your data in S3, define the schema, and start querying using standard SQL.
 
 Prerequisite Tasks
-------------------
+^^^^^^^^^^^^^^^^^^
 
 .. include:: _partials/prerequisite_tasks.rst
 
-Using Operator
---------------
-Use the
-:class:`~airflow.providers.amazon.aws.operators.athena.AthenaOperator`
-to run a query in Amazon Athena.  To get started with Amazon Athena please 
visit
-`aws.amazon.com/athena <https://aws.amazon.com/athena>`_
 
+.. _howto/operator:AthenaOperator:
+
+Athena Operator
+^^^^^^^^^^^^^^^
 
-In the following example, we create an Athena table and run a query based upon 
a CSV file
-created in an S3 bucket and populated with SAMPLE_DATA.  The example waits for 
the query
-to complete and then drops the created table and deletes the sample CSV file 
in the S3
-bucket.
+Use the :class:`~airflow.providers.amazon.aws.operators.athena.AthenaOperator`
+to run a query in Amazon Athena.
+
+
+In the following example, we query an existing Athena table and send the 
results to
+an existing Amazon S3 bucket.  For more examples of how to use this operator, 
please
+see the `Sample DAG 
<https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/example_dags/example_athena.py>`__.
 
 .. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_athena.py
     :language: python
-    :start-after: [START howto_athena_operator_and_sensor]
-    :end-before: [END howto_athena_operator_and_sensor]
+    :start-after: [START howto_athena_operator]
+    :dedent: 4
+    :end-before: [END howto_athena_operator]
+
+.. _howto/operator:AthenaSensor:
+
+Athena Sensor
+^^^^^^^^^^^^^
+
+Use the :class:`~airflow.providers.amazon.aws.sensors.athena.AthenaSensor`
+to wait for the results of a query in Amazon Athena.
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_athena.py
+    :language: python
+    :start-after: [START howto_athena_sensor]
+    :dedent: 4
+    :end-before: [END howto_athena_sensor]
+
 
-More information
-----------------
+Reference
+^^^^^^^^^
 
-For further information, look at the documentation of 
:meth:`~Athena.Client.start_query_execution` method
-in `boto3`_.
+For further information, look at:
 
-.. _boto3: https://pypi.org/project/boto3/
+* `Boto3 Library Documentation for Athena 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html>`__

Reply via email to