baolsen commented on a change in pull request #6773: [AIRFLOW-6038] AWS
DataSync example_dags added
URL: https://github.com/apache/airflow/pull/6773#discussion_r357500939
##########
File path: airflow/providers/amazon/aws/operators/datasync.py
##########
@@ -62,60 +80,102 @@ class AWSDataSyncCreateTaskOperator(BaseOperator):
``boto3.create_location_xyz(**create_destination_location_kwargs)``
The xyz is determined from the prefix of destination_location_uri, eg
``smb:/...` or ``s3:/...``
Example: ``{'S3BucketArn': ..., 'S3Config': {'BucketAccessRoleArn':
...}, ...}``
+ :param dict update_task_kwargs: If a suitable TaskArn is found or created,
+ it will be updated if ``update_task_kwargs`` is defined.
+ ``update_task_kwargs`` is used internally like this:
+ ``boto3.update_task(TaskArn=task_arn, **update_task_kwargs)``
+ Example: ``{'Name': 'xyz', 'Options': ..., 'Excludes': ...}``
- :raises AirflowException: If neither ``source_location_uri`` nor
- ``destination_location_uri`` were specified.
+ :raises AirflowException: If ``task_arn`` was not specified, or if
+ either ``source_location_uri`` or ``destination_location_uri`` were
+ not specified.
:raises AirflowException: If source or destination Location weren't found
and could not be created.
- :raises AirflowException: If Task creation fails.
+ :raises AirflowException: If Task creation, update, execution or delete
fails.
"""
- template_fields = ('source_location_uri',
- 'destination_location_uri')
- ui_color = '#44b5e2'
+ template_fields = (
+ "task_arn",
+ "source_location_uri",
+ "destination_location_uri",
+ "create_task_kwargs",
+ "create_source_location_kwargs",
+ "create_destination_location_kwargs",
+ "update_task_kwargs",
+ )
+ ui_color = "#44b5e2"
@apply_defaults
def __init__(
self,
- aws_conn_id='aws_default',
+ aws_conn_id="aws_default",
+ wait_interval_seconds=5,
+ task_arn=None,
source_location_uri=None,
destination_location_uri=None,
- case_sensitive_location_search=True,
+ location_search_case_sensitive=True,
+ location_search_ignore_trailing_slash=True,
+ choose_task_callable=None,
+ choose_location_callable=None,
create_task_kwargs=None,
create_source_location_kwargs=None,
create_destination_location_kwargs=None,
+ update_task_kwargs=None,
+ delete_task_after_execution=False,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
# Assignments
self.aws_conn_id = aws_conn_id
+ self.wait_interval_seconds = wait_interval_seconds
+
+ self.task_arn = task_arn
+
self.source_location_uri = source_location_uri
self.destination_location_uri = destination_location_uri
- self.case_sensitive_location_search = case_sensitive_location_search
- if create_task_kwargs:
- self.create_task_kwargs = create_task_kwargs
- else:
- self.create_task_kwargs = dict()
+
+ self.location_search_case_sensitive = location_search_case_sensitive
+ self.location_search_ignore_trailing_slash = (
+ location_search_ignore_trailing_slash
+ )
+
+ self.choose_task_callable = choose_task_callable
+ self.choose_location_callable = choose_location_callable
+
+ self.create_task_kwargs = create_task_kwargs if create_task_kwargs
else dict()
+ self.create_source_location_kwargs = dict()
if create_source_location_kwargs:
self.create_source_location_kwargs = create_source_location_kwargs
- else:
- self.create_source_location_kwargs = dict()
+ self.create_destination_location_kwargs = dict()
if create_destination_location_kwargs:
self.create_destination_location_kwargs =
create_destination_location_kwargs
- else:
- self.create_destination_location_kwargs = dict()
+
+ self.update_task_kwargs = update_task_kwargs if update_task_kwargs
else dict()
+ self.delete_task_after_execution = delete_task_after_execution
# Validations
- if not (self.source_location_uri and self.destination_location_uri):
+ valid = False
+ if self.task_arn:
+ valid = True
+ if self.source_location_uri and self.destination_location_uri:
+ valid = True
+ if not valid:
raise AirflowException(
- 'Specify both source_location_uri and
destination_location_uri')
+ "Specify task_arn or both source_location_uri and
destination_location_uri"
Review comment:
Thanks, done :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services