feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS 
DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r395985184
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -208,12 +217,41 @@ def execute(self, context):
 
         self.log.info("Using DataSync TaskArn %s", self.task_arn)
 
-        # Update the DataSync Task
+        # Update the DataSync Task definition
         if self.update_task_kwargs:
             self._update_datasync_task()
 
-        # Execute the DataSync Task
-        self._execute_datasync_task()
+        # Wait for the Task to be in a valid state to Start
+        self.task_status = self._wait_get_status_before_start()
+
+        self.log.info('Task status is %s.', self.task_status)
+        if self.task_status in self.TASK_STATUS_START:
+            self.log.info(
+                'The Task will be started because its status is in %s.',
+                self.TASK_STATUS_START)
+            # Start the DataSync Task
+            self._start_datasync_task()
+        elif self.task_status in self.TASK_STATUS_SKIP_START:
+            self.log.info(
+                'The Task will NOT be started because its status is in %s.',
+                self.TASK_STATUS_SKIP_START)
+            if not self.task_execution_arn:
+                task_description = 
self.get_hook().get_task_description(self.task_arn)
+                if 'CurrentTaskExecutionArn' in task_description:
+                    self.task_execution_arn = 
task_description['CurrentTaskExecutionArn']
+                else:
+                    raise AirflowException(
+                        'Starting the Task was skipped,'
+                        ' but no CurrentTaskExecutionArn was found.')
+        elif self.task_status in self.TASK_STATUS_FAIL:
+            raise AirflowException(
+                'Task cannot be started because its status is in %s.'
+                % self.TASK_STATUS_FAIL
+            )
+        else:
+            raise AirflowException('Unexpected task status %s.' % 
self.task_status)
 
 Review comment:
   ```suggestion
               raise AirflowException(f'Unexpected task status 
{self.task_status}')
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to