TheoLauw commented on issue #56501:
URL: https://github.com/apache/airflow/issues/56501#issuecomment-3422296920

   Hi @tatiana  ! Thank you for helping us on this case. 
   We are using astronomer-cosmos 1.10.2 and no astronomer-runtime (sorry for 
previous error) but I also tried other versions and I have the same issue. For 
airflow, I didn't try the version 2 but tried a wide range of versions 3.X. I 
had the same issue unfortunately. 
   
   Here is the things needed to reproduce the issue : 
   
   Environment:
   - Airflow: 3.1.0
   - astronomer-cosmos: 1.10.2
   - dbt-core: 1.11.0-b2
   - Executor: KubernetesExecutor (problem also seen forcing CeleryExecutor 
per-task)
   - Remote logging: enabled to S3 (issue = only last retry logs persist; 
earlier failed attempts lose logs with ServerResponseError 'Not Found')
   - Python: 3.12
   - Issue: Any failing dbt task inside DbtTaskGroup with retries>0 loses 
intermediate attempt logs
   
   Directory layout (simplified):
   dags/
     simple_dag.py
   plugins/
     airflow_data_eng/
       dbt_tasks.py
   test/
     dbt/
       test/
         dbt_project.yml
         profiles.yml
         packages.yml
         models/
           failing_model.sql
   
   Sample files:
   
   simple_dag.py
   ````python
   from datetime import datetime
   from airflow import DAG
   from airflow_data_eng.dbt_tasks import DbtTaskGroup
   
   with DAG(
       dag_id="cosmos_repro_dag",
       start_date=datetime(2025, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
       dbt_task_group = DbtTaskGroup(tag="issues")
   ````
   
   dbt_tasks.py
   ````python
   from pathlib import Path
   from cosmos import DbtTaskGroup as BaseDbtTaskGroup
   from cosmos.constants import TestBehavior, InvocationMode, LoadMode
   from cosmos.config import (
       ProjectConfig,
       ProfileConfig,
       ExecutionConfig,
       ExecutionMode,
       RenderConfig,
   )
   
   class DbtTaskGroup(BaseDbtTaskGroup):
       def __init__(self, *, tag: str, **kwargs):
           project_dir = Path("/opt/airflow/dags/test/dbt/test")  # ajuster si 
nécessaire
           super().__init__(
               project_config=ProjectConfig(
                   install_dbt_deps=True,
                   dbt_project_path=str(project_dir),
               ),
               profile_config=ProfileConfig(
                   profile_name="test",
                   target_name="staging",
                   profiles_yml_filepath=str(project_dir / "profiles.yml"),
               ),
               execution_config=ExecutionConfig(ExecutionMode.LOCAL),
               render_config=RenderConfig(
                   select=[f"tag:{tag}"],
                   test_behavior=TestBehavior.AFTER_EACH,
                   load_method=LoadMode.DBT_LS,
                   dbt_deps=True,
                   invocation_mode=InvocationMode.SUBPROCESS,
               ),
               operator_args={"retries": 2},
               **kwargs,
           )
   ````
   
   dbt_project.yml
   ````yaml
   name: test
   version: 1.0.0
   profile: test
   model-paths: ["models"]
   ````
   
   profiles.yml
   ````yaml
   test:
     target: staging
     outputs:
       staging:
         type: snowflake
         account: thefork.eu-west-1
         user: PROD_AMALIA_SERVICE
         warehouse: STAGING_SELF_SERVICE_WAREHOUSE
         database: TEST_STAGING
         schema: SALES
         private_key: "{{ env_var('SNOWFLAKE_PRIVATE_KEY') }}"
         threads: 4
   ````
   
   packages.yml
   ````yaml
   packages: []
   ````
   
   models/issue_fail.sql
   ````sql
   {{ config(tags=["issues"]) }}
   select * from NON_EXISTENT_TABLE  -- provoque l'échec
   ````
   
   
   Reproduction steps:
   1. Deploy above with remote logging to S3 enabled.
   2. Introduce a failing model (e.g. reference non-existent table) tagged 
issues.
   3. Trigger DAG; allow retries.
   4. Observe: only final retry logs accessible; earlier attempt logs missing. 
Worker logs show ServerResponseError: Not Found during task_instances.finish 
and leaked semaphore warnings.
   
   Observed error excerpt (intermediate retry):
   ````text
   airflow.sdk.api.client.ServerResponseError: Server returned error
   message': 'Not Found', 'detail': {'detail': 'Not Found'}
   ````
   
   Confirmation tests:
   - Same behavior with InvocationMode.DBT_RUNNER.
   - Same behavior forcing operator_args["executor"] = "CeleryExecutor".
   - Standard BashOperator with same retry settings uploads all attempt logs 
correctly.
   
   Let me know if you need more info please. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to