badrisrinivas9 opened a new issue, #30073:
URL: https://github.com/apache/airflow/issues/30073

   ### Apache Airflow version
   
   2.5.1
   
   ### What happened
   
   Expanding of task group fails when the list is empty and there is a task 
which references mapped index in xcom pull of that group.
   
   
![image](https://user-images.githubusercontent.com/114723574/224769499-4a094b0c-8bbe-455f-9034-70c1cbfe2e3a.png)
    
    throws below error 
   Traceback (most recent call last):
     File "/opt/bitnami/airflow/venv/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/__main__.py", 
line 39, in main
       args.func(args)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
 line 52, in command
       return func(*args, **kwargs)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 108, in wrapper
       return f(*args, **kwargs)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
 line 73, in scheduler
       _run_scheduler_job(args=args)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       job.run()
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/base_job.py",
 line 258, in run
       self._execute()
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 759, in _execute
       self._run_scheduler_loop()
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 885, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 964, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/retries.py",
 line 78, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/tenacity/__init__.py", 
line 384, in __iter__
       do = self.iter(retry_state=retry_state)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/tenacity/__init__.py", 
line 351, in iter
       return fut.result()
     File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 
439, in result
       return self.__get_result()
     File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 
391, in __get_result
       raise self._exception
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/retries.py",
 line 87, in wrapped_function
       return func(*args, **kwargs)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 1253, in _schedule_all_dag_runs
       callback_to_run = self._schedule_dag_run(dag_run, session)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 1322, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, 
execute_callbacks=False)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py",
 line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 563, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py",
 line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 710, in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 793, in _get_ready_tis
       if not schedulable.are_dependencies_met(session=session, 
dep_context=dep_context):
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py",
 line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1070, in are_dependencies_met
       for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, 
session=session):
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1091, in get_failed_dep_statuses
       for dep_status in dep.get_dep_statuses(self, session, dep_context):
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/base_ti_dep.py",
 line 107, in get_dep_statuses
       yield from self._get_dep_statuses(ti, session, cxt)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py",
 line 93, in _get_dep_statuses
       yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, 
session=session)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py",
 line 219, in _evaluate_trigger_rule
       .filter(or_(*_iter_upstream_conditions()))
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py",
 line 191, in _iter_upstream_conditions
       map_indexes = _get_relevant_upstream_map_indexes(upstream_id)
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py",
 line 138, in _get_relevant_upstream_map_indexes
       return ti.get_relevant_upstream_map_indexes(
     File 
"/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 2652, in get_relevant_upstream_map_indexes
       ancestor_map_index = self.map_index * ancestor_ti_count // ti_count
   
   ### What you think should happen instead
   
   In case of empty list all the task group should be skipped
   
   ### How to reproduce
   
        from airflow.operators.bash import BashOperator
       from airflow.operators.python import get_current_context
       import pendulum
       
       from airflow.decorators import dag, task, task_group
       from airflow.operators.empty import EmptyOperator
       
       
       @dag(dag_id="test", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
            schedule=None, catchup=False,
            render_template_as_native_obj=True
            )
       def testdag():
           task1 =EmptyOperator(task_id="get_attribute_can_json_mapping")
           @task
           def lkp_schema_output_mapping(**context):
               return 1
       
           @task
           def task2(**context):
               return 2
       
           @task
           def task3(table_list, **context):
               return []
       
           [task2(), task1,
            group2.expand(file_name=task3(table_list=task2()))]
       
       
       @task_group(
           group_id="group2"
       )
       def group2(file_name):
           @task
           def get_table_name(name):
               return "testing"
       
           table_name = get_table_name(file_name)
       
           run_this = BashOperator(
               task_id="run_this",
               bash_command="echo 
{{task_instance.xcom_pull(task_ids='copy_to_staging.get_table_name',"
                                      "map_indexes=task_instance.map_index)}}",
           )
       
           table_name >> run_this
       
       
       
       
       dag = testdag()
       
       if __name__ == "__main__":
           dag.test()
   
   
   ### Operating System
   
   Debian GNU/Linux 11
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==7.1.0
   apache-airflow-providers-apache-cassandra==3.1.0
   apache-airflow-providers-apache-drill==2.3.1
   apache-airflow-providers-apache-druid==3.3.1
   apache-airflow-providers-apache-hdfs==3.2.0
   apache-airflow-providers-apache-hive==5.1.1
   apache-airflow-providers-apache-pinot==4.0.1
   apache-airflow-providers-arangodb==2.1.0
   apache-airflow-providers-celery==3.1.0
   apache-airflow-providers-cloudant==3.1.0
   apache-airflow-providers-cncf-kubernetes==5.1.1
   apache-airflow-providers-common-sql==1.3.3
   apache-airflow-providers-databricks==4.0.0
   apache-airflow-providers-docker==3.4.0
   apache-airflow-providers-elasticsearch==4.3.3
   apache-airflow-providers-exasol==4.1.3
   apache-airflow-providers-ftp==3.3.0
   apache-airflow-providers-google==8.8.0
   apache-airflow-providers-grpc==3.1.0
   apache-airflow-providers-hashicorp==3.2.0
   apache-airflow-providers-http==4.1.1
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-influxdb==2.1.0
   apache-airflow-providers-microsoft-azure==5.1.0
   apache-airflow-providers-microsoft-mssql==3.3.2
   apache-airflow-providers-mongo==3.1.1
   apache-airflow-providers-mysql==4.0.0
   apache-airflow-providers-neo4j==3.2.1
   apache-airflow-providers-postgres==5.4.0
   apache-airflow-providers-presto==4.2.1
   apache-airflow-providers-redis==3.1.0
   apache-airflow-providers-sendgrid==3.1.0
   apache-airflow-providers-sftp==4.2.1
   apache-airflow-providers-slack==7.2.0
   apache-airflow-providers-sqlite==3.3.1
   apache-airflow-providers-ssh==3.4.0
   apache-airflow-providers-trino==4.3.1
   apache-airflow-providers-vertica==3.3.1
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   I have manually changed below in the 
taskinstance.py(get_relevant_upstream_map_indexes method) and it ran fine. 
Please check if you can implement the same
   
           if ti_count is None or ti_count == 0:
               return None
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to