ricardojosepereira opened a new issue, #62783:
URL: https://github.com/apache/airflow/issues/62783

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.1.2
   
   ### What happened?
   
   Apache airflow 3.1.2 deployed in on-prem dedicated RHEL 9 VM with the 
following providers:
   apache-airflow-providers-fab             3.0.1
   apache-airflow-providers-oracle          4.3.0
   apache-airflow-providers-ssh             4.2.1
   apache-airflow-providers-standard        1.9.1
   apache-airflow-providers-postgres        6.5.2
   pandas                                   3.0.0
   
   All DAGs executions terminated with in scheduler log:
   2026-03-03T09:35:00.510227Z [info     ] Trying to enqueue tasks: 
[<TaskInstance: remote_file_watcher.detect_files 
scheduled__2026-03-03T09:35:00+00:00 [scheduled]>] for executor: 
LocalExecutor(parallelism=32) 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:740
   2026-03-03T09:35:00.513691Z [info     ] Secrets backends loaded for worker 
[supervisor] backend_classes=['EnvironmentVariablesBackend', 
'MetastoreBackend'] count=2 loc=supervisor.py:1931
   2026-03-03T09:35:00.522952Z [warning  ] Starting call to 
'airflow.sdk.api.client.Client.request', this is the 1st time calling it. 
[airflow.sdk.api.client] loc=before.py:42
   2026-03-03T09:35:01.524535Z [warning  ] Starting call to 
'airflow.sdk.api.client.Client.request', this is the 2nd time calling it. 
[airflow.sdk.api.client] loc=before.py:42
   2026-03-03T09:35:02.800685Z [warning  ] Starting call to 
'airflow.sdk.api.client.Client.request', this is the 3rd time calling it. 
[airflow.sdk.api.client] loc=before.py:42
   2026-03-03T09:35:06.097942Z [warning  ] Starting call to 
'airflow.sdk.api.client.Client.request', this is the 4th time calling it. 
[airflow.sdk.api.client] loc=before.py:42
   2026-03-03T09:35:12.054394Z [info     ] Process exited                 
[supervisor] exit_code=<Negsignal.SIGKILL: -9> loc=supervisor.py:709 
pid=3999744 signal_sent=SIGKILL
   
   And the log execution get created but with size zero:
   
   $ cd 
dag_id\=remote_file_watcher/run_id\=scheduled__2026-03-03T09\:35\:00+00\:00/task_id\=detect_files/
   $ ll
   total 0
   -rw-r--r--. 1 airflow airflow 0 Mar  3 09:35 'attempt=1.log'
   
   
   ### What you think should happen instead?
   
   The DAG should execute the ssh since all the conditions are met to execute 
with success. But all other DAGs, terminate with same error
   
   ### How to reproduce
   
   Have a DAG with this CODE, and try to execute via scheduler:
   
   # dags/file_watcher_dag.py
   
   from airflow.decorators import dag, task
   from airflow.providers.ssh.hooks.ssh import SSHHook
   from airflow.operators.python import get_current_context
   from airflow.exceptions import AirflowSkipException
   from airflow.models import Variable
   
   from utils.datasets import FILES_CONFIG
   
   REMOTE_PATH = "/db/warehous/operador_automatico/eventos/"
   
   
   def check_remote_file(filename: str) -> bool:
       """Check if a file exists on the remote server via SSH"""
       ssh_hook = SSHHook(ssh_conn_id="Server22")
   
       with ssh_hook.get_conn() as ssh_client:
           stdin, stdout, stderr = ssh_client.exec_command(
               f"test -f {REMOTE_PATH}/{filename} && echo FOUND || echo 
NOT_FOUND"
           )
           # Use tolerant decoding in case remote output is not strict UTF-8
           result = stdout.read().decode("utf-8", errors="replace").strip()
           return result == "FOUND"
   
   
   def skip_if_already_processed(dataset_uri: str):
       """Skip emitting the dataset if it was already emitted today.
   
       Uses Airflow Variables so the state is shared across all DAG runs
       during the same logical date.
       """
       context = get_current_context()
       logical_date = context["logical_date"].date()
   
       var_key = f"{dataset_uri}_processed_{logical_date.isoformat()}"
   
       already_run = Variable.get(var_key, default_var="False")
       if already_run == "True":
           raise AirflowSkipException(f"{dataset_uri} already processed today")
   
       # mark as processed for today
       Variable.set(var_key, "True")
   
   
   @dag(
       schedule="*/5 * * * *",  # check every 5 minutes
       catchup=False,
       max_active_runs=1,  # only one DAG run at a time
       tags=["watcher"]
   )
   def remote_file_watcher():
   
       @task
       def detect_files():
           """Detect which datasets should be emitted based on remote files.
   
           Returns a list of dataset URIs for which the trigger file exists.
           """
           detected_dataset_uris = []
   
           # Open a single SSH connection and list all files once, to avoid
           # poking the server separately for each configured filename.
           ssh_hook = SSHHook(ssh_conn_id="Server22")
           with ssh_hook.get_conn() as ssh_client:
               stdin, stdout, stderr = ssh_client.exec_command(f"ls -1 
{REMOTE_PATH}")
               # Use tolerant decoding to avoid UnicodeDecodeError if filenames
               # contain bytes that are not valid UTF-8
               output = stdout.read().decode("utf-8", 
errors="replace").splitlines()
   
           remote_files = set(output)
   
           for filename, dataset in FILES_CONFIG.items():
               if filename in remote_files:
                   print(f"{filename} detected and ready to emit dataset 
{dataset.uri}")
                   detected_dataset_uris.append(dataset.uri)
               else:
                   print(f"{filename} not found, will not emit {dataset.uri}")
   
           return detected_dataset_uris
   
       @task
       def emit_dataset(detected_dataset_uris, dataset_uri: str):
           """Emit a single dataset event if its file was detected and not yet 
processed today."""
           if dataset_uri not in detected_dataset_uris:
               raise AirflowSkipException("File for this dataset not detected, 
skipping emit")
   
           skip_if_already_processed(dataset_uri)
           print(f"Emitting dataset event: {dataset_uri}")
   
       detected = detect_files()
   
       # Create one emit task per dataset in FILES_CONFIG, with its own outlet,
       # but reusing the same emit_dataset implementation so it's scalable.
       for filename, dataset in FILES_CONFIG.items():
           emit_dataset.override(
               task_id=f"emit_{filename}",
               outlets=[dataset],
           )(detected, dataset_uri=dataset.uri)
   
   
   remote_file_watcher()
   
   
   ### Operating System
   
   Red Hat Enterprise Linux 9.7
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-fab             3.0.1
   apache-airflow-providers-oracle          4.3.0
   apache-airflow-providers-ssh             4.2.1
   apache-airflow-providers-standard        1.9.1
   apache-airflow-providers-postgres        6.5.2
   pandas                                   3.0.0
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to