geoffjentry opened a new issue, #31582:
URL: https://github.com/apache/airflow/issues/31582
### Apache Airflow version
2.6.1
### What happened
When using a `@task.group` for the depth first expansion functionality, a
`@task.short_circuit` in that group throws a ForeignKeyViolation
```
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1905, in _execute_context
self.dialect.do_execute(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.ForeignKeyViolation: insert or update on table "xcom"
violates foreign key constraint "xcom_task_instance_fkey"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(test_short_circuit,
expansion_group.short_circuit, manual__2023-05-27T22:49:24.543812+00:00, -1) is
not present in table "task_instance".
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/decorators/base.py",
line 220, in execute
return_value = super().execute(context)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py",
line 268, in execute
self.skip(dag_run, execution_date, downstream_tasks)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/skipmixin.py",
line 139, in skip
XCom.set(
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 73, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py",
line 264, in set
session.flush()
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 3449, in flush
self._flush(objects)
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 3588, in _flush
with util.safe_reraise():
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 3549, in _flush
flush_context.execute()
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute
rec.execute(self)
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py",
line 630, in execute
util.preloaded.orm_persistence.save_obj(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py",
line 245, in save_obj
_emit_insert_statements(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py",
line 1097, in _emit_insert_statements
c = connection._execute_20(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py",
line 334, in _execute_on_connection
return connection._execute_clauseelement(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1577, in _execute_clauseelement
ret = self._execute_context(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1948, in _execute_context
self._handle_dbapi_exception(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 2129, in _handle_dbapi_exception
util.raise_(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1905, in _execute_context
self.dialect.do_execute(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.ForeignKeyViolation) insert
or update on table "xcom" violates foreign key constraint
"xcom_task_instance_fkey"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(test_short_circuit,
expansion_group.short_circuit, manual__2023-05-27T22:49:24.543812+00:00, -1) is
not present in table "task_instance".
[SQL: INSERT INTO xcom (dag_run_id, task_id, map_index, key, dag_id, run_id,
value, timestamp) VALUES (%(dag_run_id)s, %(task_id)s, %(map_index)s, %(key)s,
%(dag_id)s, %(run_id)s, %(value)s, %(timestamp)s)]
[parameters: {'dag_run_id': 93, 'task_id': 'expansion_group.short_circuit',
'map_index': -1, 'key': 'skipmixin_key', 'dag_id': 'test_short_circuit',
'run_id': 'manual__2023-05-27T22:49:24.543812+00:00', 'value':
<psycopg2.extensions.Binary object at 0xffff7ae48b10>, 'timestamp':
datetime.datetime(2023, 5, 27, 22, 49, 57, 760293, tzinfo=Timezone('UTC'))}]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
[2023-05-27, 22:49:57 UTC] {taskinstance.py:1345} INFO - Marking task as
FAILED. dag_id=test_short_circuit, task_id=expansion_group.short_circuit,
map_index=0, execution_date=20230527T224924, start_date=20230527T224955,
end_date=20230527T224957
[2023-05-27, 22:49:57 UTC] {standard_task_runner.py:104} ERROR - Failed to
execute job 1765 for task expansion_group.short_circuit
((psycopg2.errors.ForeignKeyViolation) insert or update on table "xcom"
violates foreign key constraint "xcom_task_instance_fkey"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(test_short_circuit,
expansion_group.short_circuit, manual__2023-05-27T22:49:24.543812+00:00, -1) is
not present in table "task_instance".
[SQL: INSERT INTO xcom (dag_run_id, task_id, map_index, key, dag_id, run_id,
value, timestamp) VALUES (%(dag_run_id)s, %(task_id)s, %(map_index)s, %(key)s,
%(dag_id)s, %(run_id)s, %(value)s, %(timestamp)s)]
[parameters: {'dag_run_id': 93, 'task_id': 'expansion_group.short_circuit',
'map_index': -1, 'key': 'skipmixin_key', 'dag_id': 'test_short_circuit',
'run_id': 'manual__2023-05-27T22:49:24.543812+00:00', 'value':
<psycopg2.extensions.Binary object at 0xffff7ae48b10>, 'timestamp':
datetime.datetime(2023, 5, 27, 22, 49, 57, 760293, tzinfo=Timezone('UTC'))}]
(Background on this error at: https://sqlalche.me/e/14/gkpj); 104)
```
### What you think should happen instead
The short circuit to have short circuited (or not) as appropriate
### How to reproduce
This DAG causes the behavior:
```
@dag(
dag_id="test_short_circuit",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
def test_short_circuit() -> None:
@task
def add_one(datum: int) -> int:
return datum + 1
@task.short_circuit
def short_circuit(datum: int) -> bool:
return datum > 3
@task
def subtract_one(datum: int) -> int:
return datum - 1
@task_group
def expansion_group(datum: int) -> None:
plus_one = add_one(datum)
foo = short_circuit(plus_one)
bar = subtract_one(plus_one)
foo >> bar
stuff = range(1, 10)
expansion_group.expand(datum=stuff)
test_short_circuit()
```
### Operating System
MacOS Ventura
### Versions of Apache Airflow Providers
apache-airflow-providers-common-sql==1.2.0
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-google==8.4.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-sendgrid==3.0.0
apache-airflow-providers-slack==6.0.0
apache-airflow-providers-sqlite==3.2.1
### Deployment
Other 3rd-party Helm chart
### Deployment details
_No response_
### Anything else
It's every time. If I convert to an old fashioned `.expand()` (i.e. serial
`.expand()` calls), it does not happen
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]