mfukushima3 opened a new issue, #31292:
URL: https://github.com/apache/airflow/issues/31292
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Airflow version in Cloud Composer: composer-1.19.13-airflow-2.3.3
I have two DAGs for reading Stripe data that are exactly the same except for
their scheduling intervals. First, they get a list of all the accounts in the
system. Then, they use that list for a dynamic task to get other Stripe data
via connected accounts (for this example, balance transactions).
The first DAG runs weekly and has catchup enabled going back to 2022-11-01.
This DAG runs without issues. The second DAG is a copy of the first with two
changes: it runs every 15 min and the catchup was only to the start of the
current day.
On some runs of the second (15 min) DAG, the dynamic task adds a mapped
instance with an empty map index. The scheduler crashes which causes the entire
environment to go down. The log gives this error:
`DETAIL: Key (dag_id, task_id, run_id, map_index)=(<DAG name>, <task name>,
scheduled__2023-05-12T19:00:00+00:00, 0) already exists.`
This error does not occur in an identical DAG that runs weekly.
### What you think should happen instead
The task should have one fewer mapped instance than is showing.
### How to reproduce
```
import stripe
from airflow.decorators import task
from airflow.operators.python import get_current_context
import logging
stripe.api_key = "sk_test_4eC39HqLyjWDarjtT1zdp7dc"
@task()
def get_balance_transactions(account_id):
context = get_current_context()
dag_run_id = context["run_id"]
interval_start = context["data_interval_start"]
interval_end = context["data_interval_end"]
created_dict = {"gte": interval_start, "lt": interval_end}
logging.info(created_dict)
response = stripe.BalanceTransaction.list(stripe_account=account_id,
created=created_dict)
transactions = []
for transaction in response.auto_paging_iter():
transaction_dict = transaction.to_dict()
transaction_dict.update({"account_id": account_id})
transactions.append(transaction_dict)
return transactions
@task()
def list_of_accounts():
response = stripe.Account.list()
accounts = []
for account in response.auto_paging_iter():
account_id = account.to_dict()["id"]
accounts.append(account_id)
return accounts
default_args = {
"owner": "owner",
"start_date": datetime(2023, 5, 11),
}
dag_args = {
"default_args": default_args,
"description": "Stripe Payment Intents Events Ingestion",
"schedule_interval": "*/15 * * * *",
"catchup": True,
"max_active_runs": 1
"tags": ["stripe", "payment_intents", "events"],
}
@dag(**dag_args)
def stripe_payment_intents_cdc():
accounts = tasks.list_of_accounts.override(task_id="get_accounts_list")()
get_balance_transactions =
tasks.get_balance_transactions.partial().expand(
account_id=accounts["return_value"]
)
accounts >> get_balance_transactions
```
### Operating System
macOS
### Versions of Apache Airflow Providers
composer-1.19.13-airflow-2.3.3
### Deployment
Google Cloud Composer
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]