potiuk commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-766300443


   Surely.  
   
   I think in this case it would be enough to mock  the underlying AWS calls 
returning the errors you expect when you are doing the "pooling" . I believe 
those timeouts are thrown when you are polling for jobs x times for some time 
and when you do not see the result, timeout exception is thrown. From what I 
understand the timeout is thrown here (datasync.py):
   
   ```
    def wait_for_task_execution(self, task_execution_arn: str, max_iterations: 
int = 2 * 180) -> bool:
           """
           Wait for Task Execution status to be complete (SUCCESS/ERROR).
           The ``task_execution_arn`` must exist, or a boto3 ClientError will 
be raised.
   
           :param str task_execution_arn: TaskExecutionArn
           :param int max_iterations: Maximum number of iterations before 
timing out.
           :return: Result of task execution.
           :rtype: bool
           :raises AirflowTaskTimeout: If maximum iterations is exceeded.
           :raises AirflowBadRequest: If ``task_execution_arn`` is empty.
           """
           if not task_execution_arn:
               raise AirflowBadRequest("task_execution_arn not specified")
   
           status = None
           iterations = max_iterations
           while status is None or status in 
self.TASK_EXECUTION_INTERMEDIATE_STATES:
               task_execution = 
self.get_conn().describe_task_execution(TaskExecutionArn=task_execution_arn)
               status = task_execution["Status"]
               self.log.info("status=%s", status)
               iterations -= 1
               if status in self.TASK_EXECUTION_FAILURE_STATES:
                   break
               if status in self.TASK_EXECUTION_SUCCESS_STATES:
                   break
               if iterations <= 0:
                   break
               time.sleep(self.wait_interval_seconds)
   
           if status in self.TASK_EXECUTION_SUCCESS_STATES:
               return True
           if status in self.TASK_EXECUTION_FAILURE_STATES:
               return False
           if iterations <= 0:
               raise AirflowTaskTimeout("Max iterations exceeded!")
           raise AirflowException(f"Unknown status: {status}")  # Should never 
happen
   ```
   
   So the `describe_task_execution` method need to be mocked to return the 
status in (None, TASK_EXECUTION_INTERMEDIATE_STATES) set with some lower value 
of max_iterations and then it should throw `AirffowTaskTimeout`. 
   
   
   However. When I looked there while writing this, I found, it is already 
there in fact done exactly as I described above, so you do not need to do 
anything (test_datasync.py):
   
   ```
       def test_wait_for_task_execution_timeout(self, mock_get_conn):
           # ### Configure mock:
           mock_get_conn.return_value = self.client
           # ### Begin tests:
   
           task_execution_arn = self.hook.start_task_execution(self.task_arn)
           with pytest.raises(AirflowTaskTimeout):
               result = self.hook.wait_for_task_execution(task_execution_arn, 
max_iterations=1)
               assert result is None
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to