StrawberryOwl opened a new issue, #46376:
URL: https://github.com/apache/airflow/issues/46376

   ### Apache Airflow Provider(s)
   
   apache-kafka
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-kafka==1.7.0
   
   ### Apache Airflow version
   
   2.10.4
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Used "Docker version 27.4.0, build bde2b89", with official docker-compose 
from 
[airflow-website](https://airflow.apache.org/docs/apache-airflow/2.10.4/docker-compose.yaml)
 as basis. Some changes to install "apache-airflow[apache-kafka]" and include 
Kafka containers into compose-file.
   
   ### What happened
   
   I am designing a workflow sending and receiving Kafka messages. For 
reception I am using AwaitMessageSensor where you have to pass an 
apply_function in dot-sting-notation as used by importlib. Directly after 
startup of the airflow system, everything works fine, but when the code of 
apply_function is changed, these changes are not reflected by the trigger 
actually being executed. The problem gets highlighted, if the function is 
renamed. The trigger crashes with an ImprtError "Module "myDAG" does not define 
a "my_apply_function" attribute/class", although my_apply_function is defined 
in the currently used DAG-file. I verified that the most recent version of the 
file is correctly synced to the triggerer-container (mounted into the container 
as per default) and it can also be found in the database.
   
   ### What you think should happen instead
   
   Changing the apply function should be correctly reflected by the triggerer.
   
   ### How to reproduce
   
   Take a working Kafka-Airflow-Setup, use an AwaitMessageSensor, rename its 
apply_function. (I think this problem is quite general, so I did not provide 
code, if someone really requires my docker-compose and a minimal DAG, I could 
provide it later).
   
   ### Anything else
   
   I researched quite a while, to come to anything near a solution. The 
TriggererRunner uses a cache to get triggerer classes by their function name 
(see [source 
code](https://github.com/apache/airflow/blob/8a3757b99e247d2f09a86d12e6aff8d525a3f3e7/airflow/jobs/triggerer_job_runner.py#L759)).
 I read #31743 which has been closed after 
[updates](https://github.com/apache/airflow/issues/31743#issuecomment-1598786020)
 to the Documentation of deferrable operators and also digged into the source 
code of AwaitMessageSensor and AwaitMessageTrigger where the string passed via 
apply_function is imported by importlib. So I expected my new code to be 
imported each time, I used the sensor.
   
   My understanding of this problem is far from complete, but it seems to me 
that my previous expectation is not met, because the TriggererRunner caches the 
triggerers and therefore the import of my apply_function by importlib is only 
performed once. If that is not true, please correct me.
   
   I propose that at least the documentation should be updated, explicitly 
pointing out that changing apply_function requires a restart of the triggerer. 
The aforementioned changes where only in the section about writing your own 
triggerers, which I did not take into account, since I am not writing my own 
triggerer, but only using one from an offical provider. The way it is working 
now is very unexpected and cost me a lot of time figuring out. But from a user 
perspective it would be much more convenient, if the apply_function could be 
updated whenever code is updated across all containers, e.g. by sidestepping 
the caching either with the whole AwaitMessageTrigger or at least the 
apply_function.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to