ewelinastr opened a new issue, #44196:
URL: https://github.com/apache/airflow/issues/44196

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.7.2
   
   ### What happened?
   
   When using DatabricksRunNowOperator with deferrable set to True, option 
max_active_tis_per_dag is not limiting number of concurrent tasks run.
   
   
   
   ### What you think should happen instead?
   
   When you set max_active_tis_per_dag, Airflow should ensure that: 
   `(the count of running and queued tasks) <= max_active_tis_per_dag`
    no matter if used with deferrable, or non-defferable setting.
   However this is takes effect only for non-defferable, for deferrable 
operator/setting it is not not limiting number of concurrent tasks run.
   
   ### How to reproduce
   
   I believe this is the case for all of the defferable-Operators. However I 
only tried with DatabricksRunNowOperator. 
   In order to reproduce just use defferable-operator with dynamically created 
tasks with max_active_tis_per_dag set to 1.
   When used with DatabricksRunNowOperator it will create many run of tasks (in 
Airflow and in Databricks).
   You can use below code for dag
   
       from airflow.providers.databricks.operators.databricks import 
DatabricksRunNowOperator
       from airflow.decorators import task
   
       with DAG(
           dag_id="something",
           description="something",
           default_args=DEFAULT_ARGS,
           start_date=datetime(2024, 11, 19, 17, 0, 0, tz="UTC")) as dag:
   
       @task
       def prepare_dbx_params() -> list[dict[str, str]]:
           some_list = ["aa", "bb", "cc", "dd", "ee", "ff"]
           dbx_params = [
               {
                   "python_named_params": {
                       "DBX_ARG1": something,
                       "DBX_ARG2": something[0]
                   }
               }
               for something in some_list
           ]
           return dbx_params
   
   
       dbx_input_params = prepare_dbx_params()
   
       trigger_dbx_job = DatabricksRunNowOperator.partial(
           task_id="trigger-dbx-job-dag",
           job_name=<NAME OF JOB>,
           databricks_conn_id=<NAME OF DBX CONNECTION>
           deferrable=True,
           max_active_tis_per_dag=1,
       ).expand_kwargs(dbx_input_params)
   
       trigger_dbx_job
   
   ### Operating System
   
   macOS
   
   ### Versions of Apache Airflow Providers
   
   apache_airflow-2.7.3
   apache_airflow_providers_databricks-4.6.0
   
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to