potiuk commented on pull request #13832:
URL: https://github.com/apache/airflow/pull/13832#issuecomment-766300443
Surely.
I think in this case it would be enough to mock the underlying AWS calls
returning the errors you expect when you are doing the "pooling" . I believe
those timeouts are thrown when you are polling for jobs x times for some time
and when you do not see the result, timeout exception is thrown. From what I
understand the timeout is thrown here (datasync.py):
```
def wait_for_task_execution(self, task_execution_arn: str, max_iterations:
int = 2 * 180) -> bool:
"""
Wait for Task Execution status to be complete (SUCCESS/ERROR).
The ``task_execution_arn`` must exist, or a boto3 ClientError will
be raised.
:param str task_execution_arn: TaskExecutionArn
:param int max_iterations: Maximum number of iterations before
timing out.
:return: Result of task execution.
:rtype: bool
:raises AirflowTaskTimeout: If maximum iterations is exceeded.
:raises AirflowBadRequest: If ``task_execution_arn`` is empty.
"""
if not task_execution_arn:
raise AirflowBadRequest("task_execution_arn not specified")
status = None
iterations = max_iterations
while status is None or status in
self.TASK_EXECUTION_INTERMEDIATE_STATES:
task_execution =
self.get_conn().describe_task_execution(TaskExecutionArn=task_execution_arn)
status = task_execution["Status"]
self.log.info("status=%s", status)
iterations -= 1
if status in self.TASK_EXECUTION_FAILURE_STATES:
break
if status in self.TASK_EXECUTION_SUCCESS_STATES:
break
if iterations <= 0:
break
time.sleep(self.wait_interval_seconds)
if status in self.TASK_EXECUTION_SUCCESS_STATES:
return True
if status in self.TASK_EXECUTION_FAILURE_STATES:
return False
if iterations <= 0:
raise AirflowTaskTimeout("Max iterations exceeded!")
raise AirflowException(f"Unknown status: {status}") # Should never
happen
```
So the `describe_task_execution` method need to be mocked to return the
status in (None, TASK_EXECUTION_INTERMEDIATE_STATES) set with some lower value
of max_iterations and then it should throw `AirffowTaskTimeout`.
However. When I looked there while writing this, I found, it is already
there in fact done exactly as I described above, so you do not need to do
anything (test_datasync.py):
```
def test_wait_for_task_execution_timeout(self, mock_get_conn):
# ### Configure mock:
mock_get_conn.return_value = self.client
# ### Begin tests:
task_execution_arn = self.hook.start_task_execution(self.task_arn)
with pytest.raises(AirflowTaskTimeout):
result = self.hook.wait_for_task_execution(task_execution_arn,
max_iterations=1)
assert result is None
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]