cliebBS opened a new issue, #35563:
URL: https://github.com/apache/airflow/issues/35563

   ### Apache Airflow version
   
   2.7.3
   
   ### What happened
   
   When using the `RdsDeleteDbInstanceOperator` to delete an RDS instance in my 
DAG, sometimes it misses the fact that the instance was deleted and leaves logs 
like: 
   
   ```
   [2023-11-09, 19:32:14 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
deleting
   [2023-11-09, 19:32:44 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
deleting
   [2023-11-09, 19:33:14 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
deleting
   [2023-11-09, 19:33:44 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
   [2023-11-09, 19:34:14 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
   [2023-11-09, 19:34:44 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
   [2023-11-09, 19:35:14 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
   [2023-11-09, 19:35:44 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
   [2023-11-09, 19:36:14 UTC] {waiter_with_logging.py:78} INFO - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 1] DB Instance status is: 
   ```
   
   In the above example, at some point between 19:33:14 and 19:33:44, the 
actual instance was deleted, but the operator doesn't realize that it was 
deleted and instead continues to poll for the status of the RDS instance until 
it reaches the `waiter_max_attempts`, at which point it fails.  Retries of the 
operator exit immediately with the log message:
   
   ```
   [2023-11-09, 19:57:11 UTC] {taskinstance.py:1937} ERROR - [findings_sync 
manual__2023-11-09T17:58:26+00:00 prev_db--delete_db 2] Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/operators/rds.py",
 line 690, in execute
       delete_db_instance = self.hook.conn.delete_db_instance(
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 
535, in _api_call
       return self._make_api_call(operation_name, kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 
980, in _make_api_call
       raise error_class(parsed_response, operation_name)
   botocore.exceptions.ClientError: An error occurred (AccessDenied) when 
calling the DeleteDBInstance operation: User: 
arn:aws:sts::account-num:assumed-role/an-iam-role/botocore-session is not 
authorized to perform: rds:DeleteDBInstance on resource: 
arn:aws:rds:us-east-1:account-num:db:test-db because no identity-based policy 
allows the rds:DeleteDBInstance action
   ```
   
   I /think/ what's happening is that the deletion is completing in the time 
between when the waiter times out and when it reschedules to run again.  Since 
the RDS delete operation doesn't leave any sign of the DB in the AWS API 
(unlike a terminated EMR cluster, for example), so running the waiter again 
leaves the operator in a weird state where it thinks the resource exists, but 
is unable to get a status for it.
   
   ### What you think should happen instead
   
   The `RdsDeleteDbInstanceOperator` should always complete in the success 
state when the RDS instance is deleted.
   
   ### How to reproduce
   
   ```python
   import datetime
   from typing import Any
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.providers.amazon.aws.operators.rds import (
       RdsCreateDbInstanceOperator,
       RdsDeleteDbInstanceOperator,
   )
   
   NUM_INSTANCES = 6  # set this higher to make it easier to reproduce in a 
single run
   
   # TODO: set these appropriately for your environment
   INSTANCE_PREFIX = "rds-delete-repro"
   
   # I don't think we can make this any smaller
   STORAGE_SIZE = 20
   INSTANCE_TYPE = "db.t4g.micro"
   
   
   @task()
   def get_names(num_instances: int) -> list[str]:
       return [f"{INSTANCE_PREFIX}-{x}" for x in range(num_instances)]
   
   
   @task()
   def get_rds_kwargs(names: list[str]) -> list[dict[str, Any]]:
       rds_kwargs: dict[str, Any] = {
           "EngineVersion": "14.8",
           "DBParameterGroupName": "default.postgres14",
           "MasterUsername": "postgres",
           "MasterUserPassword": "postgres",
           "StorageType": "gp3",
           "AllocatedStorage": STORAGE_SIZE,
           "Port": 5432,
           "NetworkType": "IPV4",
           "MultiAZ": False,
           "BackupRetentionPeriod": 0,  # disables automatic backup.
           "PubliclyAccessible": False,
           "AutoMinorVersionUpgrade": False,
           "DeletionProtection": False,
           "StorageEncrypted": True,
           # TODO: set these to valid values for your AWS account
           # "VpcSecurityGroupIds": ["sg-0ced89fe13d1ae49a"],
           # "DBSubnetGroupName": "dbsubnet-rds-dev-eks",
       }
   
       return [{"db_instance_identifier": name, "rds_kwargs": rds_kwargs} for 
name in names]
   
   
   with DAG(
       dag_id="rds_delete_repro",
       default_args={"retries": 1},
       start_date=datetime.datetime(2021, 1, 1),
       catchup=False,
       schedule_interval=None,
   ) as dag:
       db_instance_ids = get_names(NUM_INSTANCES)
   
       rds_kwargs = get_rds_kwargs(names=db_instance_ids)
   
       create_rds = RdsCreateDbInstanceOperator.partial(
           task_id="create_rds",
           db_instance_class=INSTANCE_TYPE,
           engine="postgres",
       ).expand_kwargs(rds_kwargs)
   
       delete_rds = RdsDeleteDbInstanceOperator.partial(
           task_id="delete_rds",
           rds_kwargs={
               "SkipFinalSnapshot": True,
               "DeleteAutomatedBackups": True,
           },
       ).expand(
           db_instance_identifier=db_instance_ids,
       )
   
       rds_kwargs >> create_rds >> delete_rds
   ```
   
   You can increase the number of RDS instances this will spin up at once to 
increase the odds that you'll trigger this problem.
   
   ### Operating System
   
   Official Docker image
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==8.10.0
   ```
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   MacOS Sonoma 14.0
   
   ```
   ❯ docker version
   Client: Docker Engine - Community
    Version:           24.0.7
    API version:       1.42 (downgraded from 1.43)
    Go version:        go1.21.3
    Git commit:        afdd53b4e3
    Built:             Thu Oct 26 07:06:42 2023
    OS/Arch:           darwin/arm64
    Context:           colima
   
   Server:
    Engine:
     Version:          23.0.6
     API version:      1.42 (minimum version 1.12)
     Go version:       go1.20.4
     Git commit:       9dbdbd4b6d7681bd18c897a6ba0376073c2a72ff
     Built:            Fri May 12 13:54:36 2023
     OS/Arch:          linux/arm64
     Experimental:     true
    containerd:
     Version:          v1.7.0
     GitCommit:        1fbd70374134b891f97ce19c70b6e50c7b9f4e0d
    runc:
     Version:          1.1.7
     GitCommit:        860f061b76bb4fc671f0f9e900f7d80ff93d4eb7
    docker-init:
     Version:          0.19.0
     GitCommit:
   ```
   ```
   ❯ docker-compose version
   Docker Compose version 2.23.0
   ```
   ```
   ❯ colima version
   colima version 0.5.6
   git commit: ceef812c32ab74a49df9f270e048e5dced85f932
   
   runtime: docker
   arch: aarch64
   client: v24.0.7
   server: v23.0.6
   ```
   ```
   ❯ limactl --version
   limactl version 0.18.0
   ```
   ```
   ❯ qemu-img --version
   qemu-img version 8.1.2
   Copyright (c) 2003-2023 Fabrice Bellard and the QEMU Project developers
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to