geoffjentry opened a new issue, #31582:
URL: https://github.com/apache/airflow/issues/31582

   ### Apache Airflow version
   
   2.6.1
   
   ### What happened
   
   When using a `@task.group` for the depth first expansion functionality, a 
`@task.short_circuit` in that group throws a ForeignKeyViolation
   
   ```
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1905, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.ForeignKeyViolation: insert or update on table "xcom" 
violates foreign key constraint "xcom_task_instance_fkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(test_short_circuit, 
expansion_group.short_circuit, manual__2023-05-27T22:49:24.543812+00:00, -1) is 
not present in table "task_instance".
   The above exception was the direct cause of the following exception:
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/decorators/base.py", 
line 220, in execute
       return_value = super().execute(context)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py",
 line 268, in execute
       self.skip(dag_run, execution_date, downstream_tasks)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/skipmixin.py",
 line 139, in skip
       XCom.set(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 73, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", 
line 264, in set
       session.flush()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
       with util.safe_reraise():
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
       rec.execute(self)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py",
 line 245, in save_obj
       _emit_insert_statements(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py",
 line 1097, in _emit_insert_statements
       c = connection._execute_20(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1948, in _execute_context
       self._handle_dbapi_exception(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 2129, in _handle_dbapi_exception
       util.raise_(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1905, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.ForeignKeyViolation) insert 
or update on table "xcom" violates foreign key constraint 
"xcom_task_instance_fkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(test_short_circuit, 
expansion_group.short_circuit, manual__2023-05-27T22:49:24.543812+00:00, -1) is 
not present in table "task_instance".
   [SQL: INSERT INTO xcom (dag_run_id, task_id, map_index, key, dag_id, run_id, 
value, timestamp) VALUES (%(dag_run_id)s, %(task_id)s, %(map_index)s, %(key)s, 
%(dag_id)s, %(run_id)s, %(value)s, %(timestamp)s)]
   [parameters: {'dag_run_id': 93, 'task_id': 'expansion_group.short_circuit', 
'map_index': -1, 'key': 'skipmixin_key', 'dag_id': 'test_short_circuit', 
'run_id': 'manual__2023-05-27T22:49:24.543812+00:00', 'value': 
<psycopg2.extensions.Binary object at 0xffff7ae48b10>, 'timestamp': 
datetime.datetime(2023, 5, 27, 22, 49, 57, 760293, tzinfo=Timezone('UTC'))}]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   [2023-05-27, 22:49:57 UTC] {taskinstance.py:1345} INFO - Marking task as 
FAILED. dag_id=test_short_circuit, task_id=expansion_group.short_circuit, 
map_index=0, execution_date=20230527T224924, start_date=20230527T224955, 
end_date=20230527T224957
   [2023-05-27, 22:49:57 UTC] {standard_task_runner.py:104} ERROR - Failed to 
execute job 1765 for task expansion_group.short_circuit 
((psycopg2.errors.ForeignKeyViolation) insert or update on table "xcom" 
violates foreign key constraint "xcom_task_instance_fkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(test_short_circuit, 
expansion_group.short_circuit, manual__2023-05-27T22:49:24.543812+00:00, -1) is 
not present in table "task_instance".
   [SQL: INSERT INTO xcom (dag_run_id, task_id, map_index, key, dag_id, run_id, 
value, timestamp) VALUES (%(dag_run_id)s, %(task_id)s, %(map_index)s, %(key)s, 
%(dag_id)s, %(run_id)s, %(value)s, %(timestamp)s)]
   [parameters: {'dag_run_id': 93, 'task_id': 'expansion_group.short_circuit', 
'map_index': -1, 'key': 'skipmixin_key', 'dag_id': 'test_short_circuit', 
'run_id': 'manual__2023-05-27T22:49:24.543812+00:00', 'value': 
<psycopg2.extensions.Binary object at 0xffff7ae48b10>, 'timestamp': 
datetime.datetime(2023, 5, 27, 22, 49, 57, 760293, tzinfo=Timezone('UTC'))}]
   (Background on this error at: https://sqlalche.me/e/14/gkpj); 104)
   ```
   
   ### What you think should happen instead
   
   The short circuit to have short circuited (or not) as appropriate
   
   ### How to reproduce
   
   This DAG causes the behavior:
   
   ```
   @dag(
       dag_id="test_short_circuit",
       schedule=None,
       start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       catchup=False,
   )
   def test_short_circuit() -> None:
       @task
       def add_one(datum: int) -> int:
           return datum + 1
   
       @task.short_circuit
       def short_circuit(datum: int) -> bool:
           return datum > 3
   
       @task
       def subtract_one(datum: int) -> int:
           return datum - 1
   
       @task_group
       def expansion_group(datum: int) -> None:
           plus_one = add_one(datum)
           foo = short_circuit(plus_one)
           bar = subtract_one(plus_one)
           foo >> bar
   
       stuff = range(1, 10)
       expansion_group.expand(datum=stuff)
   
   test_short_circuit()
   ```
   
   ### Operating System
   
   MacOS Ventura
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-common-sql==1.2.0
   apache-airflow-providers-ftp==3.1.0
   apache-airflow-providers-google==8.4.0
   apache-airflow-providers-http==4.0.0
   apache-airflow-providers-imap==3.0.0
   apache-airflow-providers-postgres==5.2.2
   apache-airflow-providers-sendgrid==3.0.0
   apache-airflow-providers-slack==6.0.0
   apache-airflow-providers-sqlite==3.2.1
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   It's every time. If I convert to an old fashioned `.expand()` (i.e. serial 
`.expand()` calls), it does not happen
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to