TheoLauw commented on issue #56501:
URL: https://github.com/apache/airflow/issues/56501#issuecomment-3422296920
Hi @tatiana ! Thank you for helping us on this case.
We are using astronomer-cosmos 1.10.2 and no astronomer-runtime (sorry for
previous error) but I also tried other versions and I have the same issue. For
airflow, I didn't try the version 2 but tried a wide range of versions 3.X. I
had the same issue unfortunately.
Here is the things needed to reproduce the issue :
Environment:
- Airflow: 3.1.0
- astronomer-cosmos: 1.10.2
- dbt-core: 1.11.0-b2
- Executor: KubernetesExecutor (problem also seen forcing CeleryExecutor
per-task)
- Remote logging: enabled to S3 (issue = only last retry logs persist;
earlier failed attempts lose logs with ServerResponseError 'Not Found')
- Python: 3.12
- Issue: Any failing dbt task inside DbtTaskGroup with retries>0 loses
intermediate attempt logs
Directory layout (simplified):
dags/
simple_dag.py
plugins/
airflow_data_eng/
dbt_tasks.py
test/
dbt/
test/
dbt_project.yml
profiles.yml
packages.yml
models/
failing_model.sql
Sample files:
simple_dag.py
````python
from datetime import datetime
from airflow import DAG
from airflow_data_eng.dbt_tasks import DbtTaskGroup
with DAG(
dag_id="cosmos_repro_dag",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
) as dag:
dbt_task_group = DbtTaskGroup(tag="issues")
````
dbt_tasks.py
````python
from pathlib import Path
from cosmos import DbtTaskGroup as BaseDbtTaskGroup
from cosmos.constants import TestBehavior, InvocationMode, LoadMode
from cosmos.config import (
ProjectConfig,
ProfileConfig,
ExecutionConfig,
ExecutionMode,
RenderConfig,
)
class DbtTaskGroup(BaseDbtTaskGroup):
def __init__(self, *, tag: str, **kwargs):
project_dir = Path("/opt/airflow/dags/test/dbt/test") # ajuster si
nécessaire
super().__init__(
project_config=ProjectConfig(
install_dbt_deps=True,
dbt_project_path=str(project_dir),
),
profile_config=ProfileConfig(
profile_name="test",
target_name="staging",
profiles_yml_filepath=str(project_dir / "profiles.yml"),
),
execution_config=ExecutionConfig(ExecutionMode.LOCAL),
render_config=RenderConfig(
select=[f"tag:{tag}"],
test_behavior=TestBehavior.AFTER_EACH,
load_method=LoadMode.DBT_LS,
dbt_deps=True,
invocation_mode=InvocationMode.SUBPROCESS,
),
operator_args={"retries": 2},
**kwargs,
)
````
dbt_project.yml
````yaml
name: test
version: 1.0.0
profile: test
model-paths: ["models"]
````
profiles.yml
````yaml
test:
target: staging
outputs:
staging:
type: snowflake
account: thefork.eu-west-1
user: PROD_AMALIA_SERVICE
warehouse: STAGING_SELF_SERVICE_WAREHOUSE
database: TEST_STAGING
schema: SALES
private_key: "{{ env_var('SNOWFLAKE_PRIVATE_KEY') }}"
threads: 4
````
packages.yml
````yaml
packages: []
````
models/issue_fail.sql
````sql
{{ config(tags=["issues"]) }}
select * from NON_EXISTENT_TABLE -- provoque l'échec
````
Reproduction steps:
1. Deploy above with remote logging to S3 enabled.
2. Introduce a failing model (e.g. reference non-existent table) tagged
issues.
3. Trigger DAG; allow retries.
4. Observe: only final retry logs accessible; earlier attempt logs missing.
Worker logs show ServerResponseError: Not Found during task_instances.finish
and leaked semaphore warnings.
Observed error excerpt (intermediate retry):
````text
airflow.sdk.api.client.ServerResponseError: Server returned error
message': 'Not Found', 'detail': {'detail': 'Not Found'}
````
Confirmation tests:
- Same behavior with InvocationMode.DBT_RUNNER.
- Same behavior forcing operator_args["executor"] = "CeleryExecutor".
- Standard BashOperator with same retry settings uploads all attempt logs
correctly.
Let me know if you need more info please.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]