tirkarthi opened a new issue, #27864:
URL: https://github.com/apache/airflow/issues/27864

   ### Apache Airflow version
   
   2.4.3
   
   ### What happened
   
   `current_state` method on TaskInstance doesn't filter by `map_index` so 
calling this method on mapped task instance fails.
   
   
https://github.com/apache/airflow/blob/fb7c6afc8cb7f93909bd2e654ea185eb6abcc1ea/airflow/models/taskinstance.py#L708-L726
   
   ### What you think should happen instead
   
   map_index should also be filtered in the query to return single TaskInstance 
object.
   
   ### How to reproduce
   
   ```python
   with create_session() as session:
       print(session.query(TaskInstance).filter(TaskInstance.dag_id == 
"divide_by_zero", TaskInstance.map_index == 1, TaskInstance.run_id == 
'scheduled__2022-11-22T00:00:00+00:00').scalar().current_state())
    
   ---------------------------------------------------------------------------
   MultipleResultsFound                      Traceback (most recent call last)
   Input In [7], in <cell line: 1>()
         1 with create_session() as session:
   ----> 2     print(session.query(TaskInstance).filter(TaskInstance.dag_id == 
"divide_by_zero", TaskInstance.map_index == 1, TaskInstance.run_id == 
'scheduled__2022-11-22T00:00:00+00:00').scalar().current_state())
   
   File ~/stuff/python/airflow/airflow/utils/session.py:75, in 
provide_session.<locals>.wrapper(*args, **kwargs)
        73 else:
        74     with create_session() as session:
   ---> 75         return func(*args, session=session, **kwargs)
   
   File ~/stuff/python/airflow/airflow/models/taskinstance.py:725, in 
TaskInstance.current_state(self, session)
       708 @provide_session
       709 def current_state(self, session: Session = NEW_SESSION) -> str:
       710     """
       711     Get the very latest state from the database, if a session is 
passed,
       712     we use and looking up the state becomes part of the session, 
otherwise
      (...)
       715     :param session: SQLAlchemy ORM Session
       716     """
       717     return (
       718         session.query(TaskInstance.state)
       719         .filter(
       720             TaskInstance.dag_id == self.dag_id,
       721             TaskInstance.task_id == self.task_id,
       722             TaskInstance.run_id == self.run_id,
       723             # TaskInstance.map_index == self.map_index
       724         )
   --> 725         .scalar()
       726     )
   
   File 
~/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2803,
 in Query.scalar(self)
      2801 # TODO: not sure why we can't use result.scalar() here
      2802 try:
   -> 2803     ret = self.one()
      2804     if not isinstance(ret, collections_abc.Sequence):
      2805         return ret
   
   File 
~/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2780,
 in Query.one(self)
      2762 def one(self):
      2763     """Return exactly one result or raise an exception.
      2764 
      2765     Raises ``sqlalchemy.orm.exc.NoResultFound`` if the query selects
      (...)
      2778 
      2779     """
   -> 2780     return self._iter().one()
   
   File 
~/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/engine/result.py:1162,
 in Result.one(self)
      1134 def one(self):
      1135     # type: () -> Row
      1136     """Return exactly one row or raise an exception.
      1137 
      1138     Raises :class:`.NoResultFound` if the result returns no
      (...)
      1160 
      1161     """
   -> 1162     return self._only_one_row(True, True, False)
   
   File 
~/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/engine/result.py:620,
 in ResultInternal._only_one_row(self, raise_for_second_row, raise_for_none, 
scalar)
       618     if next_row is not _NO_ROW:
       619         self._soft_close(hard=True)
   --> 620         raise exc.MultipleResultsFound(
       621             "Multiple rows were found when exactly one was required"
       622             if raise_for_none
       623             else "Multiple rows were found when one or none "
       624             "was required"
       625         )
       626 else:
       627     next_row = _NO_ROW
   
   MultipleResultsFound: Multiple rows were found when exactly one was required
   
   ```
   
   ### Operating System
   
   Ubuntu
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to