mfukushima3 opened a new issue, #31292:
URL: https://github.com/apache/airflow/issues/31292

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow version in Cloud Composer: composer-1.19.13-airflow-2.3.3
   
   I have two DAGs for reading Stripe data that are exactly the same except for 
their scheduling intervals. First, they get a list of all the accounts in the 
system. Then, they use that list for a dynamic task to get other Stripe data 
via connected accounts (for this example, balance transactions).
   
   The first DAG runs weekly and has catchup enabled going back to 2022-11-01. 
This DAG runs without issues. The second DAG is a copy of the first with two 
changes: it runs every 15 min and the catchup was only to the start of the 
current day.
   
   On some runs of the second (15 min) DAG, the dynamic task adds a mapped 
instance with an empty map index. The scheduler crashes which causes the entire 
environment to go down. The log gives this error:
   `DETAIL:  Key (dag_id, task_id, run_id, map_index)=(<DAG name>, <task name>, 
scheduled__2023-05-12T19:00:00+00:00, 0) already exists.`
   
   This error does not occur in an identical DAG that runs weekly.
   
   ### What you think should happen instead
   
   The task should have one fewer mapped instance than is showing.
   
   ### How to reproduce
   
   ```
   import stripe
   from airflow.decorators import task
   from airflow.operators.python import get_current_context
   import logging
   
   stripe.api_key = "sk_test_4eC39HqLyjWDarjtT1zdp7dc"
   
   @task()
   def get_balance_transactions(account_id):
       context = get_current_context()
       dag_run_id = context["run_id"]
       interval_start = context["data_interval_start"]
       interval_end = context["data_interval_end"]
   
       created_dict = {"gte": interval_start, "lt": interval_end}
       logging.info(created_dict)
   
       response = stripe.BalanceTransaction.list(stripe_account=account_id, 
created=created_dict)
   
       transactions = []
       for transaction in response.auto_paging_iter():
           transaction_dict = transaction.to_dict()
           transaction_dict.update({"account_id": account_id})
           transactions.append(transaction_dict)
           
       return transactions
           
   
   @task()
   def list_of_accounts():
       response = stripe.Account.list()
   
       accounts = []
       for account in response.auto_paging_iter():
           account_id = account.to_dict()["id"]
           accounts.append(account_id)
   
       return accounts
   
   default_args = {
       "owner": "owner",
       "start_date": datetime(2023, 5, 11),
   }
   dag_args = {
       "default_args": default_args,
       "description": "Stripe Payment Intents Events Ingestion",
       "schedule_interval": "*/15 * * * *",
       "catchup": True,
       "max_active_runs": 1
       "tags": ["stripe", "payment_intents", "events"],
   }
   
   
   @dag(**dag_args)
   def stripe_payment_intents_cdc():
       accounts = tasks.list_of_accounts.override(task_id="get_accounts_list")()
   
       get_balance_transactions = 
tasks.get_balance_transactions.partial().expand(
           account_id=accounts["return_value"]
       )
       
       accounts >> get_balance_transactions
   ```
   
   ### Operating System
   
   macOS
   
   ### Versions of Apache Airflow Providers
   
   composer-1.19.13-airflow-2.3.3
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to