badrisrinivas9 opened a new issue, #30073: URL: https://github.com/apache/airflow/issues/30073
### Apache Airflow version 2.5.1 ### What happened Expanding of task group fails when the list is empty and there is a task which references mapped index in xcom pull of that group.  throws below error Traceback (most recent call last): File "/opt/bitnami/airflow/venv/bin/airflow", line 8, in <module> sys.exit(main()) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/__main__.py", line 39, in main args.func(args) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 52, in command return func(*args, **kwargs) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 108, in wrapper return f(*args, **kwargs) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 73, in scheduler _run_scheduler_job(args=args) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job job.run() File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 258, in run self._execute() File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 759, in _execute self._run_scheduler_loop() File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 885, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 964, in _do_scheduling callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/retries.py", line 78, in wrapped_function for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs): File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/tenacity/__init__.py", line 384, in __iter__ do = self.iter(retry_state=retry_state) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/tenacity/__init__.py", line 351, in iter return fut.result() File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 439, in result return self.__get_result() File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result raise self._exception File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/retries.py", line 87, in wrapped_function return func(*args, **kwargs) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1253, in _schedule_all_dag_runs callback_to_run = self._schedule_dag_run(dag_run, session) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1322, in _schedule_dag_run schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 563, in update_state info = self.task_instance_scheduling_decisions(session) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 710, in task_instance_scheduling_decisions schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis( File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 793, in _get_ready_tis if not schedulable.are_dependencies_met(session=session, dep_context=dep_context): File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1070, in are_dependencies_met for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session): File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1091, in get_failed_dep_statuses for dep_status in dep.get_dep_statuses(self, session, dep_context): File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 107, in get_dep_statuses yield from self._get_dep_statuses(ti, session, cxt) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 93, in _get_dep_statuses yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, session=session) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 219, in _evaluate_trigger_rule .filter(or_(*_iter_upstream_conditions())) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 191, in _iter_upstream_conditions map_indexes = _get_relevant_upstream_map_indexes(upstream_id) File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 138, in _get_relevant_upstream_map_indexes return ti.get_relevant_upstream_map_indexes( File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2652, in get_relevant_upstream_map_indexes ancestor_map_index = self.map_index * ancestor_ti_count // ti_count ### What you think should happen instead In case of empty list all the task group should be skipped ### How to reproduce from airflow.operators.bash import BashOperator from airflow.operators.python import get_current_context import pendulum from airflow.decorators import dag, task, task_group from airflow.operators.empty import EmptyOperator @dag(dag_id="test", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=None, catchup=False, render_template_as_native_obj=True ) def testdag(): task1 =EmptyOperator(task_id="get_attribute_can_json_mapping") @task def lkp_schema_output_mapping(**context): return 1 @task def task2(**context): return 2 @task def task3(table_list, **context): return [] [task2(), task1, group2.expand(file_name=task3(table_list=task2()))] @task_group( group_id="group2" ) def group2(file_name): @task def get_table_name(name): return "testing" table_name = get_table_name(file_name) run_this = BashOperator( task_id="run_this", bash_command="echo {{task_instance.xcom_pull(task_ids='copy_to_staging.get_table_name'," "map_indexes=task_instance.map_index)}}", ) table_name >> run_this dag = testdag() if __name__ == "__main__": dag.test() ### Operating System Debian GNU/Linux 11 ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==7.1.0 apache-airflow-providers-apache-cassandra==3.1.0 apache-airflow-providers-apache-drill==2.3.1 apache-airflow-providers-apache-druid==3.3.1 apache-airflow-providers-apache-hdfs==3.2.0 apache-airflow-providers-apache-hive==5.1.1 apache-airflow-providers-apache-pinot==4.0.1 apache-airflow-providers-arangodb==2.1.0 apache-airflow-providers-celery==3.1.0 apache-airflow-providers-cloudant==3.1.0 apache-airflow-providers-cncf-kubernetes==5.1.1 apache-airflow-providers-common-sql==1.3.3 apache-airflow-providers-databricks==4.0.0 apache-airflow-providers-docker==3.4.0 apache-airflow-providers-elasticsearch==4.3.3 apache-airflow-providers-exasol==4.1.3 apache-airflow-providers-ftp==3.3.0 apache-airflow-providers-google==8.8.0 apache-airflow-providers-grpc==3.1.0 apache-airflow-providers-hashicorp==3.2.0 apache-airflow-providers-http==4.1.1 apache-airflow-providers-imap==3.1.1 apache-airflow-providers-influxdb==2.1.0 apache-airflow-providers-microsoft-azure==5.1.0 apache-airflow-providers-microsoft-mssql==3.3.2 apache-airflow-providers-mongo==3.1.1 apache-airflow-providers-mysql==4.0.0 apache-airflow-providers-neo4j==3.2.1 apache-airflow-providers-postgres==5.4.0 apache-airflow-providers-presto==4.2.1 apache-airflow-providers-redis==3.1.0 apache-airflow-providers-sendgrid==3.1.0 apache-airflow-providers-sftp==4.2.1 apache-airflow-providers-slack==7.2.0 apache-airflow-providers-sqlite==3.3.1 apache-airflow-providers-ssh==3.4.0 apache-airflow-providers-trino==4.3.1 apache-airflow-providers-vertica==3.3.1 ### Deployment Other ### Deployment details _No response_ ### Anything else I have manually changed below in the taskinstance.py(get_relevant_upstream_map_indexes method) and it ran fine. Please check if you can implement the same if ti_count is None or ti_count == 0: return None ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
