tanaydalmia opened a new issue, #50202:
URL: https://github.com/apache/airflow/issues/50202

   ### Apache Airflow Provider(s)
   
   apache-kafka
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-kafka  -  1.8.1
   
   ### Apache Airflow version
   
   2.10.5
   
   ### Operating System
   
   Linux
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   I'm using the Airflow Helm Chart with airflowVersion: "2.10.5" and executor: 
"LocalExecutor".
   Also, the dags are part of externally mounted PVC. 
   
   ### What happened
   
   Getting ModuleNotFoundError: No module named 'kafka_sensor_jira'.
   File Name: kafka_sensor_jira
   
   ### What you think should happen instead
   
   It should have worked because it works fine on my local docker system. The 
only change here I see is the external PVC which consists of the DAG files.
   
   
   ### How to reproduce
   
   For the below code, create a DAG file name as kafka_sensor_jira.py, and just 
run the DAG, it throws error saying no module kafka_sensor_jira
   Code:
   from airflow import DAG
   from airflow.decorators import dag
   from airflow.providers.apache.kafka.sensors.kafka import (
       AwaitMessageTriggerFunctionSensor,
   )
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   from pendulum import datetime
   # from helpers import listen_function
   import json
   import random
   import string
   import sys
   import os
   from pathlib import Path
   
   TOPIC_NAME = "graphdb_jira"
   
   
   def _generate_uuid():
       letters = string.ascii_lowercase
       return "".join(random.choice(letters) for i in range(6))
   
   
   with DAG(
       dag_id="kafka_sensor_jira_dag",
       start_date=datetime(2025, 3, 17),
       schedule_interval=None,  # This DAG runs continuously
       catchup=False,
   ):
   
       def listen_function(message):
           val = json.loads(message.value().decode("utf-8"))
           # Process the message here if needed
           return val
   
       def trigger_downstream_dag(message, **context):
           TriggerDagRunOperator(
               trigger_dag_id="kafka_consumer_jira_dag",
               task_id=f"kafka_sensor_jira{_generate_uuid()}",
           ).execute(context)
       
       await_message_sensor = AwaitMessageTriggerFunctionSensor(
           task_id="kafka_sensor_jira",
           topics=[TOPIC_NAME],
           apply_function="kafka_sensor_jira.listen_function",
           kafka_config_id="kafka_default_jira",
           event_triggered_function=trigger_downstream_dag,
       )
   
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to