Ferdinanddb opened a new issue, #56691:
URL: https://github.com/apache/airflow/issues/56691

   ### Apache Airflow version
   
   3.1.0
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I have the following DAG that emits Assets on a daily basis:
   ```python
   import os
   from datetime import datetime, timedelta
   from pathlib import Path
   
   from airflow.providers.google.cloud.operators.gcs import 
GCSListObjectsOperator
   from airflow.providers.google.cloud.transfers.gcs_to_gcs import 
GCSToGCSOperator
   from airflow.providers.google.cloud.transfers.sftp_to_gcs import 
SFTPToGCSOperator
   from airflow.providers.sftp.sensors.sftp import SFTPSensor
   from airflow.providers.standard.operators.python import PythonOperator, 
ShortCircuitOperator
   from airflow.sdk import DAG, Asset, AssetAlias, task
   from airflow_richfox.custom.utils.check_date import should_run_mon_to_fri
   
   PROJECT_ID = os.environ.get("PROJECT_ID")
   GCS_LANDING_ZONE_BUCKET = f"{PROJECT_ID}-landing-zone-euw4"
   GCS_STAGING_BUCKET = f"{PROJECT_ID}-staging-euw4"
   GCS_ARCHIVE_BUCKET = f"{PROJECT_ID}-archive-euw4"
   
   SFTP_FILE_NAME = "IVYDB.{{ data_interval_start | ds_nodash }}D.zip"
   
   BLOB_DESTINATION_PATH: str = Path(
       "{{ dag.dag_id }}", "{{ data_interval_start | ds_nodash }}", 
SFTP_FILE_NAME
   ).as_posix()
   
   SFTP_FILE_PATH = Path("/IvyDBUS/v6.0/Update/", SFTP_FILE_NAME).as_posix()
   
   GCP_CONN_ID = "google_cloud_default"
   
   
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 1,
       "start_date": datetime(2024, 9, 7),
       "retry_delay": timedelta(minutes=1),
   }
   
   
   # Update your DAG to use parallel processing
   with DAG(
       dag_id=Path(__file__).parent.name,
       doc_md=__doc__,
       schedule="00 7 * * *",
       default_args=default_args,
       max_active_runs=1,
       catchup=False,
       on_failure_callback=default_dag_failure_slack_webhook_notification,
   ) as dag:
       list_files_in_staging = GCSListObjectsOperator(
           task_id="list_files_in_staging",
           bucket=GCS_STAGING_BUCKET,
           prefix="{{ dag.dag_id }}/{{ data_interval_start | ds_nodash }}/",
           gcp_conn_id=GCP_CONN_ID,
       )
   
       
@task(outlets=[AssetAlias("opentmetrics-update-iceberg-bronze-tables-dags-to-trigger")])
       def emit_airflow_assets(file_paths: list[str], outlet_events, 
**context):  # noqa: ANN001, ANN201
           """Emit Assets for all GCS files."""
           for file_path in file_paths:
               optionmetrics_table_name = 
Path(file_path).name.split(".")[0].lower()
               asset_name = 
f"staging-file.optionmetrics_{optionmetrics_table_name}_update"
               outlet_events[
                   
AssetAlias("opentmetrics-update-iceberg-bronze-tables-dags-to-trigger")
               ].add(
                   Asset(asset_name),
                   extra={
                       "data_interval_start": context["data_interval_start"],
                       "data_interval_end": context["data_interval_end"],
                       "ds": context["ds"],
                       "ds_nodash": context["ds_nodash"],
                       "dag_id": context["dag"].dag_id,
                       "gcs_staging_file_path": f"{file_path}",
                   },
               )
   
       emit_airflow_assets_task = 
emit_airflow_assets(list_files_in_staging.output)
   
       # Set task dependencies
       list_files_in_staging >> emit_airflow_assets_task
   ```
   
   Each Asset emitted by the DAG above will trigger a 'child' DAG, an example 
of such DAG is:
   
   ```python
   import os
   from datetime import datetime, timedelta
   from pathlib import Path
   
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   from airflow.providers.google.cloud.transfers.gcs_to_gcs import 
GCSToGCSOperator
   from airflow.sdk import DAG, Asset, Metadata, task
   from airflow_richfox.custom.utils.gcs import upload_spark_config_to_gcs
   from airflow_richfox.custom.utils.slack import 
default_dag_failure_slack_webhook_notification
   
   PROJECT_ID = os.environ.get("PROJECT_ID")
   GCS_LANDING_ZONE_BUCKET = f"{PROJECT_ID}-landing-zone-euw4"
   GCS_STAGING_BUCKET = f"{PROJECT_ID}-staging-euw4"
   GCS_ARCHIVE_BUCKET = f"{PROJECT_ID}-archive-euw4"
   
   OPTIONMETRICS_TABLENAME = "optionmetrics_ivyopprc_update"
   
   ICEBERG_CATALOG_NAME = "biglakeCatalog"
   ICEBERG_TABLE_REF = f"bronze.{OPTIONMETRICS_TABLENAME}"
   ICEBERG_FULL_TABLE_REF = f"{ICEBERG_CATALOG_NAME}.{ICEBERG_TABLE_REF}"
   
   # Define the Asset for this DAG
   asset_staging_file_optionmetrics_ivyopprc_update = 
Asset(f"staging-file.{OPTIONMETRICS_TABLENAME}")
   asset_table_bronze_optionmetrics_ivyopprc_update = 
Asset(f"x-iceberg://{ICEBERG_FULL_TABLE_REF}")
   
   
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 2,
       "start_date": datetime(2024, 9, 7),
       "retry_delay": timedelta(minutes=1),
       "weight_rule": "upstream",
       "max_active_tis_per_dag": 2,
   }
   
   
   # Update your DAG to use parallel processing
   with DAG(
       dag_id=Path(__file__).parent.name,
       dag_display_name="🟤 " + ICEBERG_FULL_TABLE_REF,
       doc_md=__doc__,
       schedule=asset_staging_file_optionmetrics_ivyopprc_update,
       default_args=default_args,
       max_active_runs=20,
       tags=[
           "optionmetrics",
           "bronze",
           OPTIONMETRICS_TABLENAME,
       ],
       params={
           "full_reload": False,
       },
       catchup=False,
       on_failure_callback=default_dag_failure_slack_webhook_notification,
   ) as dag:
       daily_changes_job = SparkKubernetesOperator(
           task_id="daily_changes_job",
           namespace="spark-operator",
           
application_file="spark_app/daily_changes_job/spark_application_config.yml",
           kubernetes_conn_id="kubernetes_default",
           random_name_suffix=True,
           get_logs=True,
           reattach_on_restart=True,
           delete_on_termination=True,
           do_xcom_push=False,
           deferrable=False,
           retries=2,
           on_execute_callback=upload_spark_config_to_gcs,
       )
   
       archive_staging_file = GCSToGCSOperator(
           task_id="archive_staging_file",
           source_bucket=GCS_STAGING_BUCKET,
           source_object="{{ (triggering_asset_events.values() | first | 
last).extra['gcs_staging_file_path'] }}",  # noqa: E501
           destination_bucket=GCS_ARCHIVE_BUCKET,
           destination_object="bronze.optionmetrics_ivyopprc_update/",
           move_object=True,
           replace=True,
           gcp_conn_id="google_cloud_default",
       )
   
       @task(outlets=[asset_table_bronze_optionmetrics_ivyopprc_update])
       def emit_assets_if_none_failed(**context):  # noqa: ANN201
           """Emit assets if no tasks have failed."""
           yield Metadata(
               asset_table_bronze_optionmetrics_ivyopprc_update,
               extra={
                   "data_interval_start": context["triggering_asset_events"][
                       asset_staging_file_optionmetrics_ivyopprc_update
                   ][-1].extra["data_interval_start"],
                   "ds": context["triggering_asset_events"][
                       asset_staging_file_optionmetrics_ivyopprc_update
                   ][-1].extra["ds"],
               },
           )
   
       emit_assets = emit_assets_if_none_failed()
   
       daily_changes_job >> [emit_assets, archive_staging_file]
   ```
   
   As one can see, there is only one Asset object referenced in the `schedule` 
parameter of the DAG, so I expect that one Asset will trigger one DagRun. But 
sometimes, a DagRun has more than one Source Asset being referenced, as in the 
following screenshot:
   
   <img width="1827" height="863" alt="Image" 
src="https://github.com/user-attachments/assets/26410430-b6c6-44bc-a661-ab1170b72f3d";
 />
   
   This is a bit random, or at least from my perspective. I wonder if this 
could be because the scheduler or DAG processor takes too much time to do its 
loop, and then some Assets get grouped together?
   
   Thank you if you can help.
   
   ### What you think should happen instead?
   
   One Asset event should trigger one DagRun, when the DAG only depends on a 
single Asset. The DAG should not consume more than one Asset events.
   
   ### How to reproduce
   
   Create a parent DAG which emits an Asset event via an AssetAlias object, 
with the extra field always being unique.
   
   Create a child DAG which is scheduled using the Asset from the parent DAG.
   
   Backfill the parent DAG (my backfill created 500 DagRuns) with the Max 
Active Runs parameter set to 1.
   
   Most of the DAG will have a unique source asset, but some of them will have 
many.
   
   ### Operating System
   
   Official airflow image
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to