This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0751b77048 Convert S3 example DAG to System test (AIP-47) (#26535)
0751b77048 is described below
commit 0751b7704841fee83d2f3f65dcde62c095a5dd13
Author: syedahsn <[email protected]>
AuthorDate: Wed Sep 21 03:03:26 2022 -0700
Convert S3 example DAG to System test (AIP-47) (#26535)
* S3 system test (AIP-47) using SystemTestContextBuilder
* Move constants to module level
Add task to delete second bucket
---
.../operators/s3.rst | 32 ++--
.../system/providers/amazon/aws}/example_s3.py | 183 +++++++++++++--------
2 files changed, 132 insertions(+), 83 deletions(-)
diff --git a/docs/apache-airflow-providers-amazon/operators/s3.rst
b/docs/apache-airflow-providers-amazon/operators/s3.rst
index bbb13d9763..3d594fcdcb 100644
--- a/docs/apache-airflow-providers-amazon/operators/s3.rst
+++ b/docs/apache-airflow-providers-amazon/operators/s3.rst
@@ -38,7 +38,7 @@ Create an Amazon S3 bucket
To create an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_create_bucket]
@@ -52,7 +52,7 @@ Delete an Amazon S3 bucket
To delete an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_delete_bucket]
@@ -66,7 +66,7 @@ Set the tags for an Amazon S3 bucket
To set the tags for an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_put_bucket_tagging]
@@ -80,7 +80,7 @@ Get the tag of an Amazon S3 bucket
To get the tag set associated with an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_get_bucket_tagging]
@@ -94,7 +94,7 @@ Delete the tags of an Amazon S3 bucket
To delete the tags of an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_delete_bucket_tagging]
@@ -108,7 +108,7 @@ Create an Amazon S3 object
To create a new (or replace) Amazon S3 object you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_create_object]
@@ -123,7 +123,7 @@ To copy an Amazon S3 object from one bucket to another you
can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator`.
The Amazon S3 connection used here needs to have access to both source and
destination bucket/key.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_copy_object]
@@ -137,7 +137,7 @@ Delete Amazon S3 objects
To delete one or multiple Amazon S3 objects you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_delete_objects]
@@ -153,7 +153,7 @@ To transform the data from one Amazon S3 object and save it
to another object yo
You can also apply an optional [Amazon S3 Select
expression](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-select.html)
to select the data you want to retrieve from ``source_s3_key`` using
``select_expression``.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_file_transform]
@@ -169,7 +169,7 @@ To list all Amazon S3 prefixes within an Amazon S3 bucket
you can use
See `here
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html>`__
for more information about Amazon S3 prefixes.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_list_prefixes]
@@ -184,7 +184,7 @@ To list all Amazon S3 objects within an Amazon S3 bucket
you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3ListOperator`.
You can specify a ``prefix`` to filter the objects whose name begins with such
prefix.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_list]
@@ -208,7 +208,7 @@ Please keep in mind, especially when used to check a large
volume of keys, that
To check one file:
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_single_key]
@@ -216,7 +216,7 @@ To check one file:
To check multiple files:
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_multiple_keys]
@@ -236,13 +236,13 @@ multiple files can match one key. The list of matched S3
object attributes conta
[{"Size": int}]
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_function_definition]
:end-before: [END howto_sensor_s3_key_function_definition]
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_function]
@@ -259,7 +259,7 @@ the inactivity period has passed with no increase in the
number of objects you c
Note, this sensor will not behave correctly in reschedule mode,
as the state of the listed objects in the Amazon S3 bucket will be lost
between rescheduled invocations.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_s3.py
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_keys_unchanged]
diff --git a/airflow/providers/amazon/aws/example_dags/example_s3.py
b/tests/system/providers/amazon/aws/example_s3.py
similarity index 59%
rename from airflow/providers/amazon/aws/example_dags/example_s3.py
rename to tests/system/providers/amazon/aws/example_s3.py
index 5f5348cff1..d24ad72029 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3.py
+++ b/tests/system/providers/amazon/aws/example_s3.py
@@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations
-import os
from datetime import datetime
from airflow.models.baseoperator import chain
@@ -35,56 +34,71 @@ from airflow.providers.amazon.aws.operators.s3 import (
S3PutBucketTaggingOperator,
)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor,
S3KeysUnchangedSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
+
+DAG_ID = 'example_s3'
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+DATA = '''
+ apple,0.5
+ milk,2.5
+ bread,4.0
+'''
-BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')
-BUCKET_NAME_2 = os.environ.get('BUCKET_NAME_2', 'test-airflow-123456')
-KEY = os.environ.get('KEY', 'key')
-KEY_2 = os.environ.get('KEY_2', 'key2')
# Empty string prefix refers to the bucket root
# See what prefix is here
https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html
-PREFIX = os.environ.get('PREFIX', '')
-DELIMITER = os.environ.get('DELIMITER', '/')
-TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
-TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
-DATA = os.environ.get(
- 'DATA',
- '''
-apple,0.5
-milk,2.5
-bread,4.0
-''',
-)
+PREFIX = ''
+DELIMITER = '/'
+TAG_KEY = 'test-s3-bucket-tagging-key'
+TAG_VALUE = 'test-s3-bucket-tagging-value'
with DAG(
- dag_id='example_s3',
+ dag_id=DAG_ID,
+ schedule='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+
+ bucket_name = f'{env_id}-s3-bucket'
+ bucket_name_2 = f'{env_id}-s3-bucket-2'
+
+ key = f'{env_id}-key'
+ key_2 = f'{env_id}-key2'
+
# [START howto_sensor_s3_key_function_definition]
def check_fn(files: list) -> bool:
"""
- Example of custom check: check if all files are bigger than ``1kB``
+ Example of custom check: check if all files are bigger than ``20
bytes``
:param files: List of S3 object attributes.
:return: true if the criteria is met
:rtype: bool
"""
- return all(f.get('Size', 0) > 1024 for f in files)
+ return all(f.get('Size', 0) > 20 for f in files)
# [END howto_sensor_s3_key_function_definition]
# [START howto_operator_s3_create_bucket]
create_bucket = S3CreateBucketOperator(
- task_id='s3_create_bucket',
- bucket_name=BUCKET_NAME,
+ task_id='create_bucket',
+ bucket_name=bucket_name,
)
# [END howto_operator_s3_create_bucket]
+ create_bucket_2 = S3CreateBucketOperator(
+ task_id='create_bucket_2',
+ bucket_name=bucket_name_2,
+ )
+
# [START howto_operator_s3_put_bucket_tagging]
put_tagging = S3PutBucketTaggingOperator(
- task_id='s3_put_bucket_tagging',
- bucket_name=BUCKET_NAME,
+ task_id='put_tagging',
+ bucket_name=bucket_name,
key=TAG_KEY,
value=TAG_VALUE,
)
@@ -92,32 +106,40 @@ with DAG(
# [START howto_operator_s3_get_bucket_tagging]
get_tagging = S3GetBucketTaggingOperator(
- task_id='s3_get_bucket_tagging',
- bucket_name=BUCKET_NAME,
+ task_id='get_tagging',
+ bucket_name=bucket_name,
)
# [END howto_operator_s3_get_bucket_tagging]
# [START howto_operator_s3_delete_bucket_tagging]
delete_tagging = S3DeleteBucketTaggingOperator(
- task_id='s3_delete_bucket_tagging',
- bucket_name=BUCKET_NAME,
+ task_id='delete_tagging',
+ bucket_name=bucket_name,
)
# [END howto_operator_s3_delete_bucket_tagging]
# [START howto_operator_s3_create_object]
create_object = S3CreateObjectOperator(
- task_id="s3_create_object",
- s3_bucket=BUCKET_NAME,
- s3_key=KEY,
+ task_id="create_object",
+ s3_bucket=bucket_name,
+ s3_key=key,
data=DATA,
replace=True,
)
# [END howto_operator_s3_create_object]
+ create_object_2 = S3CreateObjectOperator(
+ task_id="create_object_2",
+ s3_bucket=bucket_name,
+ s3_key=key_2,
+ data=DATA,
+ replace=True,
+ )
+
# [START howto_operator_s3_list_prefixes]
list_prefixes = S3ListPrefixesOperator(
- task_id="s3_list_prefix_operator",
- bucket=BUCKET_NAME,
+ task_id="list_prefixes",
+ bucket=bucket_name,
prefix=PREFIX,
delimiter=DELIMITER,
)
@@ -125,8 +147,8 @@ with DAG(
# [START howto_operator_s3_list]
list_keys = S3ListOperator(
- task_id="s3_list_operator",
- bucket=BUCKET_NAME,
+ task_id="list_keys",
+ bucket=bucket_name,
prefix=PREFIX,
)
# [END howto_operator_s3_list]
@@ -134,87 +156,114 @@ with DAG(
# [START howto_sensor_s3_key_single_key]
# Check if a file exists
sensor_one_key = S3KeySensor(
- task_id="s3_sensor_one_key",
- bucket_name=BUCKET_NAME,
- bucket_key=KEY,
+ task_id="sensor_one_key",
+ bucket_name=bucket_name,
+ bucket_key=key,
)
# [END howto_sensor_s3_key_single_key]
# [START howto_sensor_s3_key_multiple_keys]
# Check if both files exist
sensor_two_keys = S3KeySensor(
- task_id="s3_sensor_two_keys",
- bucket_name=BUCKET_NAME,
- bucket_key=[KEY, KEY_2],
+ task_id="sensor_two_keys",
+ bucket_name=bucket_name,
+ bucket_key=[key, key_2],
)
# [END howto_sensor_s3_key_multiple_keys]
# [START howto_sensor_s3_key_function]
# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
- task_id="s3_sensor_key_function",
- bucket_name=BUCKET_NAME,
- bucket_key=KEY,
+ task_id="sensor_key_with_function",
+ bucket_name=bucket_name,
+ bucket_key=key,
check_fn=check_fn,
)
# [END howto_sensor_s3_key_function]
- # [START howto_sensor_s3_keys_unchanged]
- sensor_keys_unchanged = S3KeysUnchangedSensor(
- task_id="s3_sensor_one_key_size",
- bucket_name=BUCKET_NAME_2,
- prefix=PREFIX,
- inactivity_period=10,
- )
- # [END howto_sensor_s3_keys_unchanged]
-
# [START howto_operator_s3_copy_object]
copy_object = S3CopyObjectOperator(
- task_id="s3_copy_object",
- source_bucket_name=BUCKET_NAME,
- dest_bucket_name=BUCKET_NAME_2,
- source_bucket_key=KEY,
- dest_bucket_key=KEY_2,
+ task_id="copy_object",
+ source_bucket_name=bucket_name,
+ dest_bucket_name=bucket_name_2,
+ source_bucket_key=key,
+ dest_bucket_key=key_2,
)
# [END howto_operator_s3_copy_object]
# [START howto_operator_s3_file_transform]
- transforms_file = S3FileTransformOperator(
- task_id="s3_file_transform",
- source_s3_key=f's3://{BUCKET_NAME}/{KEY}',
- dest_s3_key=f's3://{BUCKET_NAME_2}/{KEY_2}',
+ file_transform = S3FileTransformOperator(
+ task_id="file_transform",
+ source_s3_key=f's3://{bucket_name}/{key}',
+ dest_s3_key=f's3://{bucket_name_2}/{key_2}',
# Use `cp` command as transform script as an example
transform_script='cp',
replace=True,
)
# [END howto_operator_s3_file_transform]
+ # [START howto_sensor_s3_keys_unchanged]
+ sensor_keys_unchanged = S3KeysUnchangedSensor(
+ task_id="sensor_keys_unchanged",
+ bucket_name=bucket_name_2,
+ prefix=PREFIX,
+ inactivity_period=10, # inactivity_period in seconds
+ )
+ # [END howto_sensor_s3_keys_unchanged]
+
# [START howto_operator_s3_delete_objects]
delete_objects = S3DeleteObjectsOperator(
- task_id="s3_delete_objects",
- bucket=BUCKET_NAME_2,
- keys=KEY_2,
+ task_id="delete_objects",
+ bucket=bucket_name_2,
+ keys=key_2,
)
# [END howto_operator_s3_delete_objects]
# [START howto_operator_s3_delete_bucket]
delete_bucket = S3DeleteBucketOperator(
- task_id='s3_delete_bucket', bucket_name=BUCKET_NAME, force_delete=True
+ task_id='delete_bucket',
+ bucket_name=bucket_name,
+ force_delete=True,
)
# [END howto_operator_s3_delete_bucket]
+ delete_bucket.trigger_rule = TriggerRule.ALL_DONE
+
+ delete_bucket_2 = S3DeleteBucketOperator(
+ task_id='delete_bucket_2',
+ bucket_name=bucket_name_2,
+ force_delete=True,
+ )
+ delete_bucket_2.trigger_rule = TriggerRule.ALL_DONE
chain(
+ # TEST SETUP
+ test_context,
+ # TEST BODY
create_bucket,
+ create_bucket_2,
put_tagging,
get_tagging,
delete_tagging,
create_object,
+ create_object_2,
list_prefixes,
list_keys,
[sensor_one_key, sensor_two_keys, sensor_key_with_function],
copy_object,
- transforms_file,
+ file_transform,
sensor_keys_unchanged,
+ # TEST TEARDOWN
delete_objects,
delete_bucket,
+ delete_bucket_2,
)
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)