Ferdinanddb opened a new issue, #56691:
URL: https://github.com/apache/airflow/issues/56691
### Apache Airflow version
3.1.0
### If "Other Airflow 2/3 version" selected, which one?
_No response_
### What happened?
I have the following DAG that emits Assets on a daily basis:
```python
import os
from datetime import datetime, timedelta
from pathlib import Path
from airflow.providers.google.cloud.operators.gcs import
GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import
GCSToGCSOperator
from airflow.providers.google.cloud.transfers.sftp_to_gcs import
SFTPToGCSOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.providers.standard.operators.python import PythonOperator,
ShortCircuitOperator
from airflow.sdk import DAG, Asset, AssetAlias, task
from airflow_richfox.custom.utils.check_date import should_run_mon_to_fri
PROJECT_ID = os.environ.get("PROJECT_ID")
GCS_LANDING_ZONE_BUCKET = f"{PROJECT_ID}-landing-zone-euw4"
GCS_STAGING_BUCKET = f"{PROJECT_ID}-staging-euw4"
GCS_ARCHIVE_BUCKET = f"{PROJECT_ID}-archive-euw4"
SFTP_FILE_NAME = "IVYDB.{{ data_interval_start | ds_nodash }}D.zip"
BLOB_DESTINATION_PATH: str = Path(
"{{ dag.dag_id }}", "{{ data_interval_start | ds_nodash }}",
SFTP_FILE_NAME
).as_posix()
SFTP_FILE_PATH = Path("/IvyDBUS/v6.0/Update/", SFTP_FILE_NAME).as_posix()
GCP_CONN_ID = "google_cloud_default"
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"start_date": datetime(2024, 9, 7),
"retry_delay": timedelta(minutes=1),
}
# Update your DAG to use parallel processing
with DAG(
dag_id=Path(__file__).parent.name,
doc_md=__doc__,
schedule="00 7 * * *",
default_args=default_args,
max_active_runs=1,
catchup=False,
on_failure_callback=default_dag_failure_slack_webhook_notification,
) as dag:
list_files_in_staging = GCSListObjectsOperator(
task_id="list_files_in_staging",
bucket=GCS_STAGING_BUCKET,
prefix="{{ dag.dag_id }}/{{ data_interval_start | ds_nodash }}/",
gcp_conn_id=GCP_CONN_ID,
)
@task(outlets=[AssetAlias("opentmetrics-update-iceberg-bronze-tables-dags-to-trigger")])
def emit_airflow_assets(file_paths: list[str], outlet_events,
**context): # noqa: ANN001, ANN201
"""Emit Assets for all GCS files."""
for file_path in file_paths:
optionmetrics_table_name =
Path(file_path).name.split(".")[0].lower()
asset_name =
f"staging-file.optionmetrics_{optionmetrics_table_name}_update"
outlet_events[
AssetAlias("opentmetrics-update-iceberg-bronze-tables-dags-to-trigger")
].add(
Asset(asset_name),
extra={
"data_interval_start": context["data_interval_start"],
"data_interval_end": context["data_interval_end"],
"ds": context["ds"],
"ds_nodash": context["ds_nodash"],
"dag_id": context["dag"].dag_id,
"gcs_staging_file_path": f"{file_path}",
},
)
emit_airflow_assets_task =
emit_airflow_assets(list_files_in_staging.output)
# Set task dependencies
list_files_in_staging >> emit_airflow_assets_task
```
Each Asset emitted by the DAG above will trigger a 'child' DAG, an example
of such DAG is:
```python
import os
from datetime import datetime, timedelta
from pathlib import Path
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import
SparkKubernetesOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import
GCSToGCSOperator
from airflow.sdk import DAG, Asset, Metadata, task
from airflow_richfox.custom.utils.gcs import upload_spark_config_to_gcs
from airflow_richfox.custom.utils.slack import
default_dag_failure_slack_webhook_notification
PROJECT_ID = os.environ.get("PROJECT_ID")
GCS_LANDING_ZONE_BUCKET = f"{PROJECT_ID}-landing-zone-euw4"
GCS_STAGING_BUCKET = f"{PROJECT_ID}-staging-euw4"
GCS_ARCHIVE_BUCKET = f"{PROJECT_ID}-archive-euw4"
OPTIONMETRICS_TABLENAME = "optionmetrics_ivyopprc_update"
ICEBERG_CATALOG_NAME = "biglakeCatalog"
ICEBERG_TABLE_REF = f"bronze.{OPTIONMETRICS_TABLENAME}"
ICEBERG_FULL_TABLE_REF = f"{ICEBERG_CATALOG_NAME}.{ICEBERG_TABLE_REF}"
# Define the Asset for this DAG
asset_staging_file_optionmetrics_ivyopprc_update =
Asset(f"staging-file.{OPTIONMETRICS_TABLENAME}")
asset_table_bronze_optionmetrics_ivyopprc_update =
Asset(f"x-iceberg://{ICEBERG_FULL_TABLE_REF}")
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"start_date": datetime(2024, 9, 7),
"retry_delay": timedelta(minutes=1),
"weight_rule": "upstream",
"max_active_tis_per_dag": 2,
}
# Update your DAG to use parallel processing
with DAG(
dag_id=Path(__file__).parent.name,
dag_display_name="🟤 " + ICEBERG_FULL_TABLE_REF,
doc_md=__doc__,
schedule=asset_staging_file_optionmetrics_ivyopprc_update,
default_args=default_args,
max_active_runs=20,
tags=[
"optionmetrics",
"bronze",
OPTIONMETRICS_TABLENAME,
],
params={
"full_reload": False,
},
catchup=False,
on_failure_callback=default_dag_failure_slack_webhook_notification,
) as dag:
daily_changes_job = SparkKubernetesOperator(
task_id="daily_changes_job",
namespace="spark-operator",
application_file="spark_app/daily_changes_job/spark_application_config.yml",
kubernetes_conn_id="kubernetes_default",
random_name_suffix=True,
get_logs=True,
reattach_on_restart=True,
delete_on_termination=True,
do_xcom_push=False,
deferrable=False,
retries=2,
on_execute_callback=upload_spark_config_to_gcs,
)
archive_staging_file = GCSToGCSOperator(
task_id="archive_staging_file",
source_bucket=GCS_STAGING_BUCKET,
source_object="{{ (triggering_asset_events.values() | first |
last).extra['gcs_staging_file_path'] }}", # noqa: E501
destination_bucket=GCS_ARCHIVE_BUCKET,
destination_object="bronze.optionmetrics_ivyopprc_update/",
move_object=True,
replace=True,
gcp_conn_id="google_cloud_default",
)
@task(outlets=[asset_table_bronze_optionmetrics_ivyopprc_update])
def emit_assets_if_none_failed(**context): # noqa: ANN201
"""Emit assets if no tasks have failed."""
yield Metadata(
asset_table_bronze_optionmetrics_ivyopprc_update,
extra={
"data_interval_start": context["triggering_asset_events"][
asset_staging_file_optionmetrics_ivyopprc_update
][-1].extra["data_interval_start"],
"ds": context["triggering_asset_events"][
asset_staging_file_optionmetrics_ivyopprc_update
][-1].extra["ds"],
},
)
emit_assets = emit_assets_if_none_failed()
daily_changes_job >> [emit_assets, archive_staging_file]
```
As one can see, there is only one Asset object referenced in the `schedule`
parameter of the DAG, so I expect that one Asset will trigger one DagRun. But
sometimes, a DagRun has more than one Source Asset being referenced, as in the
following screenshot:
<img width="1827" height="863" alt="Image"
src="https://github.com/user-attachments/assets/26410430-b6c6-44bc-a661-ab1170b72f3d"
/>
This is a bit random, or at least from my perspective. I wonder if this
could be because the scheduler or DAG processor takes too much time to do its
loop, and then some Assets get grouped together?
Thank you if you can help.
### What you think should happen instead?
One Asset event should trigger one DagRun, when the DAG only depends on a
single Asset. The DAG should not consume more than one Asset events.
### How to reproduce
Create a parent DAG which emits an Asset event via an AssetAlias object,
with the extra field always being unique.
Create a child DAG which is scheduled using the Asset from the parent DAG.
Backfill the parent DAG (my backfill created 500 DagRuns) with the Max
Active Runs parameter set to 1.
Most of the DAG will have a unique source asset, but some of them will have
many.
### Operating System
Official airflow image
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]