ronald-fenner opened a new issue, #31774:
URL: https://github.com/apache/airflow/issues/31774

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   When using the delete button in the UI or running the airflow db clean on 
the XCom table the delete method does not seem to get triggered for a custom 
backend.
   
   ### What you think should happen instead
   
   Expect the delete method to be called so the custom backend can remove the 
linked file.
   
   ### How to reproduce
   
   Using this custom backend
   `
   import warnings
   from typing import Any, Iterable, TYPE_CHECKING
   
   from airflow.exceptions import RemovedInAirflow3Warning
   from airflow.models.xcom import BaseXCom
   import json
   import os
   
   from airflow.utils.helpers import exactly_one
   from airflow.utils.json import XComDecoder, XComEncoder
   from airflow.utils.session import NEW_SESSION, provide_session
   from sqlalchemy.orm import Session
   
   
   class CustomXComBackendEFS(BaseXCom):
       DATA_FILE_PATH = '/opt/airflow/efs/data_files/xcoms'
       PREFIX = 'efs://'
   
       @staticmethod
       def serialize_value(
               value: Any,
               *,
               key: str | None = None,
               task_id: str | None = None,
               dag_id: str | None = None,
               run_id: str | None = None,
               map_index: int | None = None,
       ) -> Any:
   
           filename = "xcom_" + run_id.replace(':', '_').replace('+', '_') + 
".json"
           file_path = 
f"{CustomXComBackendEFS.DATA_FILE_PATH}/{dag_id}/{task_id}"
           os.makedirs(file_path, exist_ok=True)
           with open(file_path + '/' + filename, 'w') as handle:
               handle.write(json.dumps(value, cls=XComEncoder))
               handle.flush()
   
           return 
BaseXCom.serialize_value(value=f'{CustomXComBackendEFS.PREFIX}{file_path}/{filename}')
   
       # noinspection PyUnusedLocal
       @staticmethod
       def deserialize_value(result) -> Any:
           file_path = BaseXCom.deserialize_value(result=result)
           if isinstance(file_path, str):
               if file_path.startswith(CustomXComBackendEFS.PREFIX):
                   file_path = file_path.replace(CustomXComBackendEFS.PREFIX, 
'')
                   if file_path.startswith(CustomXComBackendEFS.DATA_FILE_PATH):
                       with open(file_path, 'r') as handle:
                           content = handle.read()
                           return json.loads(content, cls=XComDecoder)
   
           return result
   
       @classmethod
       @provide_session
       def delete(cls, xcoms, session: Session) -> None:
           if isinstance(xcoms, XCom):
               xcoms = [xcoms]
           for xcom in xcoms:
               file_path = BaseXCom.deserialize_value(result=xcom)
               if isinstance(file_path, str):
                   if file_path.startswith(CustomXComBackendEFS.PREFIX):
                       file_path = 
file_path.replace(CustomXComBackendEFS.PREFIX, '')
                       if 
file_path.startswith(CustomXComBackendEFS.DATA_FILE_PATH):
                           os.unlink(file_path)
           BaseXCom.delete(xcoms, session)
   
       @classmethod
       @provide_session
       def clear(
               cls,
               execution_date=None,
               dag_id=None,
               task_id=None,
               session=NEW_SESSION,
               *,
               run_id=None,
               map_index=None,
       ) -> None:
           from airflow.models import DagRun
   
           if dag_id is None:
               raise TypeError("clear() missing required argument: dag_id")
           if task_id is None:
               raise TypeError("clear() missing required argument: task_id")
   
           if not exactly_one(execution_date is not None, run_id is not None):
               raise ValueError(
                   f"Exactly one of run_id or execution_date must be passed. "
                   f"Passed execution_date={execution_date}, run_id={run_id}"
               )
   
           if execution_date is not None:
               message = "Passing 'execution_date' to 'XCom.clear()' is 
deprecated. Use 'run_id' instead."
               warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
               run_id = (
                   session.query(DagRun.run_id)
                   .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
execution_date)
                   .scalar()
               )
   
           filename = "xcom_" + run_id.replace(':', '_').replace('+', '_') + 
".json"
           file_path = 
f"{CustomXComBackendEFS.DATA_FILE_PATH}/{dag_id}/{task_id}"
           if os.path.exists(f"{file_path}/{filename}"):
               os.unlink(f"{file_path}/{filename}")
   
           query = session.query(cls).filter_by(dag_id=dag_id, task_id=task_id, 
run_id=run_id)
           if map_index is not None:
               query = query.filter_by(map_index=map_index)
           query.delete()
   `
   The delete method never seems to be called. The clear method works as i can 
see the XCom disappear and reappear.
   
   ### Operating System
   
   Linux 9e28031d19f0 5.15.49-linuxkit #1 SMP PREEMPT Tue Sep 13 07:51:32 UTC 
2022 x86_64 GNU/Linux
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow                           2.5.3
   apache-airflow-providers-amazon          8.0.0
   apache-airflow-providers-asana           2.1.0
   apache-airflow-providers-celery          3.1.0
   apache-airflow-providers-cncf-kubernetes 6.1.0
   apache-airflow-providers-common-sql      1.4.0
   apache-airflow-providers-databricks      4.1.0
   apache-airflow-providers-ftp             3.3.1
   apache-airflow-providers-google          10.0.0
   apache-airflow-providers-http            4.3.0
   apache-airflow-providers-imap            3.1.1
   apache-airflow-providers-jdbc            3.3.0
   apache-airflow-providers-mysql           5.0.0
   apache-airflow-providers-postgres        5.4.0
   apache-airflow-providers-redis           3.1.0
   apache-airflow-providers-salesforce      5.3.0
   apache-airflow-providers-slack           7.2.0
   apache-airflow-providers-snowflake       4.0.5
   apache-airflow-providers-sqlite          3.3.2
   apache-airflow-providers-ssh             3.6.0
   google-cloud-orchestration-airflow       1.4.1
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   I'm testing this in Docker using a custom image that we deploy onto a 
Kubernetes cluster. It is designed to store the XComs on an efs volume attached 
to it. Locally in docker, the location is a mapped volume onto my local HD so I 
can see the files.
   
   ### Anything else
   
   every time
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to