baolsen commented on a change in pull request #6773: [AIRFLOW-6038] AWS 
DataSync example_dags added
URL: https://github.com/apache/airflow/pull/6773#discussion_r357500939
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -62,60 +80,102 @@ class AWSDataSyncCreateTaskOperator(BaseOperator):
         ``boto3.create_location_xyz(**create_destination_location_kwargs)``
         The xyz is determined from the prefix of destination_location_uri, eg 
``smb:/...` or ``s3:/...``
         Example:  ``{'S3BucketArn': ..., 'S3Config': {'BucketAccessRoleArn': 
...}, ...}``
+    :param dict update_task_kwargs:  If a suitable TaskArn is found or created,
+        it will be updated if ``update_task_kwargs`` is defined.
+        ``update_task_kwargs`` is used internally like this:
+        ``boto3.update_task(TaskArn=task_arn, **update_task_kwargs)``
+        Example:  ``{'Name': 'xyz', 'Options': ..., 'Excludes': ...}``
 
-    :raises AirflowException: If neither ``source_location_uri`` nor
-        ``destination_location_uri`` were specified.
+    :raises AirflowException: If ``task_arn`` was not specified, or if
+        either ``source_location_uri`` or ``destination_location_uri`` were
+        not specified.
     :raises AirflowException: If source or destination Location weren't found
         and could not be created.
-    :raises AirflowException: If Task creation fails.
+    :raises AirflowException: If Task creation, update, execution or delete 
fails.
     """
-    template_fields = ('source_location_uri',
-                       'destination_location_uri')
-    ui_color = '#44b5e2'
+    template_fields = (
+        "task_arn",
+        "source_location_uri",
+        "destination_location_uri",
+        "create_task_kwargs",
+        "create_source_location_kwargs",
+        "create_destination_location_kwargs",
+        "update_task_kwargs",
+    )
+    ui_color = "#44b5e2"
 
     @apply_defaults
     def __init__(
         self,
-        aws_conn_id='aws_default',
+        aws_conn_id="aws_default",
+        wait_interval_seconds=5,
+        task_arn=None,
         source_location_uri=None,
         destination_location_uri=None,
-        case_sensitive_location_search=True,
+        location_search_case_sensitive=True,
+        location_search_ignore_trailing_slash=True,
+        choose_task_callable=None,
+        choose_location_callable=None,
         create_task_kwargs=None,
         create_source_location_kwargs=None,
         create_destination_location_kwargs=None,
+        update_task_kwargs=None,
+        delete_task_after_execution=False,
         *args,
         **kwargs
     ):
         super().__init__(*args, **kwargs)
 
         # Assignments
         self.aws_conn_id = aws_conn_id
+        self.wait_interval_seconds = wait_interval_seconds
+
+        self.task_arn = task_arn
+
         self.source_location_uri = source_location_uri
         self.destination_location_uri = destination_location_uri
-        self.case_sensitive_location_search = case_sensitive_location_search
-        if create_task_kwargs:
-            self.create_task_kwargs = create_task_kwargs
-        else:
-            self.create_task_kwargs = dict()
+
+        self.location_search_case_sensitive = location_search_case_sensitive
+        self.location_search_ignore_trailing_slash = (
+            location_search_ignore_trailing_slash
+        )
+
+        self.choose_task_callable = choose_task_callable
+        self.choose_location_callable = choose_location_callable
+
+        self.create_task_kwargs = create_task_kwargs if create_task_kwargs 
else dict()
+        self.create_source_location_kwargs = dict()
         if create_source_location_kwargs:
             self.create_source_location_kwargs = create_source_location_kwargs
-        else:
-            self.create_source_location_kwargs = dict()
+        self.create_destination_location_kwargs = dict()
         if create_destination_location_kwargs:
             self.create_destination_location_kwargs = 
create_destination_location_kwargs
-        else:
-            self.create_destination_location_kwargs = dict()
+
+        self.update_task_kwargs = update_task_kwargs if update_task_kwargs 
else dict()
+        self.delete_task_after_execution = delete_task_after_execution
 
         # Validations
-        if not (self.source_location_uri and self.destination_location_uri):
+        valid = False
+        if self.task_arn:
+            valid = True
+        if self.source_location_uri and self.destination_location_uri:
+            valid = True
+        if not valid:
             raise AirflowException(
-                'Specify both source_location_uri and 
destination_location_uri')
+                "Specify task_arn or both source_location_uri and 
destination_location_uri"
 
 Review comment:
   Thanks, done :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to