DinGo4DEV opened a new issue, #51809:
URL: https://github.com/apache/airflow/issues/51809

   ### Apache Airflow version
   
   3.0.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   ## What Happened
   
   When below trigger was started and there was a message event arrived, the 
trigger job running occurred error with unexpected arguments.
   
   ``` python
   def apply_function(empty,message,**kwargs):
       val = json.loads(message.value())
       print(f"Value in message is {val}")
       return True
   
   trigger = MessageQueueTrigger(
       queue="kafka://localhost:9092/test",
       apply_function="kafka_example_dag.apply_function",
       kafka_config_id="kafka_t1"
   )
   ```
   
   ## Error 
   
   ``` python
   Trigger ID 12 exited with error apply_function() got an unexpected keyword 
argument '__var' [airflow.jobs.triggerer_job_runner] 
   error_detail=[{'exc_type': 'TypeError', 'exc_value': "apply_function() got 
an unexpected keyword argument '__var'", 
   'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames': [
   {'filename': 
'/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py',
 'lineno': 901, 'name': 'cleanup_finished_triggers'}, 
   {'filename': 
'/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py',
 'lineno': 1014, 'name': 'run_trigger'}, 
   {'filename': 
'/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/kafka/triggers/await_message.py',
 'lineno': 111, 'name': 'run'}, 
   {'filename': 
'/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py', 'lineno': 
468, 'name': '__call__'}, {'filename': 
'/usr/local/lib/python3.12/concurrent/futures/thread.py', 'lineno': 59, 'name': 
'run'},
    {'filename': 
'/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py', 'lineno': 
522, 'name': 'thread_handler'}],
    'is_group': False, 'exceptions': []}]
   ```
   
   
   
   
   
   ### What you think should happen instead?
   
   As the 3.0.1 Version is working fine, so I printout the serialized event 
trigger object.
   ``` python
   ## use above trigger
   
   from airflow.serialization import serialized_objects
   
   
   serialized_objects._encode_trigger(trigger)
   
   """ Airflow 3.0.2 outputs:
   {'classpath': 
'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
    'kwargs': {'topics': ['test_topic'],
     'apply_function': 'test.noop',
     'apply_function_args': {<Encoding.VAR: '__var'>: [],
      <Encoding.TYPE: '__type'>: <DagAttributeTypes.TUPLE: 'tuple'>},
     'apply_function_kwargs': {<Encoding.VAR: '__var'>: {},
      <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>},
     'kafka_config_id': 'kafka_default',
     'poll_timeout': 1,
     'poll_interval': 5}}
   """
   
   
   """ Airflow 3.0.1 outputs:
   {'classpath': 
'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
    'kwargs': {'topics': ['test_topic'],
     'apply_function': 'test.noop',
     'apply_function_args': (),
     'apply_function_kwargs': {},
     'kafka_config_id': 'kafka_default',
     'poll_timeout': 1,
     'poll_interval': 5}}
   """
   
   from airflow.providers.apache.kafka.triggers.await_message import 
AwaitMessageTrigger
   trigger = AwaitMessageTrigger(
       topics=["test_topic"],
       apply_function="test.noop",
       kafka_config_id="kafka_default",
   )
   serialized_objects._encode_trigger(trigger)
   
   """ Airflow 3.0.1 & 3.0.2 outputs:
   {'classpath': 
'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
    'kwargs': {'topics': ['test_topic'],
     'apply_function': 'test.noop',
     'apply_function_args': (),
     'apply_function_kwargs': {},
     'kafka_config_id': 'kafka_default',
     'poll_timeout': 1,
     'poll_interval': 5}}
   """
   
   ```
   
   ### How to reproduce
   
   1. Create `kafka_example_dag.py`
   2. Create connection for `kafka_t1`
   3. Add below code and send a message in Kafka topic `test`
   
   ``` python
   import json
   
   from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.sdk import dag, Asset, AssetWatcher
   
   
   def apply_function(empty,message,**kwargs):
       val = json.loads(message.value())
       print(f"Value in message is {val}")
       return True
   
   trigger = MessageQueueTrigger(
       queue="kafka://localhost:9092/test",
       apply_function="kafka_example_dag.apply_function",
       kafka_config_id="kafka_t1"
   )
   
   asset = Asset("kafka_queue_asset_2", 
watchers=[AssetWatcher(name="kafka_watcher_2", trigger=trigger)])
   
   @dag(dag_id="example_kafka_watcher_2", schedule=[asset])
   def example_kafka_watcher():
       EmptyOperator(task_id="task")
       
   example_kafka_watcher()
   ```
   
   
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-apache-kafka==1.9.0
   apache-airflow-providers-common-messaging==1.0.2
   ```
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   ``` dockerfile
   x-airflow-common:
     &airflow-common
     # In order to add custom dependencies or upgrade provider distributions 
you can use your extended image.
     # Comment the image line, place your Dockerfile in the directory where you 
placed the docker-compose.yaml
     # and uncomment the "build" line below, Then run `docker-compose build` to 
build the images.
     image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.2}
     # build: .
     environment:
       &airflow-common-env
       AIRFLOW__CORE__EXECUTOR: CeleryExecutor
       AIRFLOW__CORE__AUTH_MANAGER: 
airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
       AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__RESULT_BACKEND: 
db+postgresql://airflow:airflow@postgres/airflow
       AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
       AIRFLOW__CORE__FERNET_KEY: ''
       AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
       AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
       AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 
'http://airflow-apiserver:8080/execution/'
       AIRFLOW_CONN_KAFKA_DEFAULT: 'kafka://kafka:29092'
       PYTHONPATH: /opt/airflow/dags:$PYTHONPATH
       # yamllint disable rule:line-length
       # Use simple http server on scheduler for health checks
       # See 
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
       # yamllint enable rule:line-length
       AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
       # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick 
checks
       # for other purpose (development, test and especially production usage) 
build/extend Airflow image.
       _PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-apache-kafka 
apache-airflow-providers-git
       # The following line can be used to set a custom config file, stored in 
the local config folder
       AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
     volumes:
       - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
       - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
       - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
       - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
     user: "${AIRFLOW_UID:-50000}:0"
     depends_on:
       &airflow-common-depends-on
       redis:
         condition: service_healthy
       postgres:
         condition: service_healthy
       kafka:
         condition: service_healthy
       
   
   services:
     postgres:
       image: postgres:13
       environment:
         POSTGRES_USER: airflow
         POSTGRES_PASSWORD: airflow
         POSTGRES_DB: airflow
       ports:
         - "5432:5432"
       volumes:
         - postgres-db-volume:/var/lib/postgresql/data
       healthcheck:
         test: ["CMD", "pg_isready", "-U", "airflow"]
         interval: 10s
         retries: 5
         start_period: 5s
       restart: always
   
     redis:
       # Redis is limited to 7.2-bookworm due to licencing change
       # https://redis.io/blog/redis-adopts-dual-source-available-licensing/
       image: redis:7.2-bookworm
       expose:
         - 6379
       healthcheck:
         test: ["CMD", "redis-cli", "ping"]
         interval: 10s
         timeout: 30s
         retries: 50
         start_period: 30s
       restart: always
   
     kafbat-ui:
       container_name: kafbat-ui
       image: ghcr.io/kafbat/kafka-ui:latest
       ports:
         - 8082:8080
       environment:
         DYNAMIC_CONFIG_ENABLED: 'true'
         KAFKA_CLUSTERS_0_NAME: airflow_test
         KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
       depends_on:
         kafka:
           condition: service_healthy
     kafka:
       image: confluentinc/cp-kafka:latest
       hostname: kafka
       user: "1000"  # Run as non-root user
       expose:
         - 9092
         - 29092
         - 29093
       ports:
         - "9092:9092"
         - "29093:29093"  # Controller port
         - "29092:29092"  # Broker port
   
       environment:
         KAFKA_KRAFT_MODE: "true"  # Enables KRaft mode.
         KAFKA_PROCESS_ROLES: broker,controller  # Kafka acts as both 
controller and broker.
         KAFKA_NODE_ID: 1  # Unique ID for the Kafka instance.
         KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"  # Controller quorum.
         KAFKA_LISTENERS: 
PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://kafka:9092
         KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
         KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
         KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
         KAFKA_ADVERTISED_LISTENERS: 
PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
         KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"  # Enables automatic topic 
creation.
         KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  # Single replica for 
simplicity.
         KAFKA_LOG_RETENTION_HOURS: 168  # Log retention period (7 days).
         KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0  # No rebalance delay.
         CLUSTER_ID: "c31d0fa15a5b483bbdf1edd4c8017394"  # Unique Kafka cluster 
ID.
       healthcheck:
         test: ["CMD", "kafka-topics", "--bootstrap-server", "kafka:9092", 
"--list"]
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       volumes:
         - ./kafka-data:/var/lib/kafka/data
   
     airflow-apiserver:
       <<: *airflow-common
       command: api-server
       ports:
         - "8080:8080"
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version";]
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
     airflow-scheduler:
       <<: *airflow-common
       command: scheduler
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:8974/health";]
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
     airflow-dag-processor:
       <<: *airflow-common
       command: dag-processor
       healthcheck:
         test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob 
--hostname "$${HOSTNAME}"']
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
     airflow-worker:
       <<: *airflow-common
       command: celery worker
       healthcheck:
         # yamllint disable rule:line-length
         test:
           - "CMD-SHELL"
           - 'celery --app 
airflow.providers.celery.executors.celery_executor.app inspect ping -d 
"celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app 
inspect ping -d "celery@$${HOSTNAME}"'
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       environment:
         <<: *airflow-common-env
         # Required to handle warm shutdown of the celery workers properly
         # See 
https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
         DUMB_INIT_SETSID: "0"
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-apiserver:
           condition: service_healthy
         airflow-init:
           condition: service_completed_successfully
   
     airflow-triggerer:
       <<: *airflow-common
       command: triggerer
       healthcheck:
         test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob 
--hostname "$${HOSTNAME}"']
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
     airflow-init:
       <<: *airflow-common
       entrypoint: /bin/bash
       # yamllint disable rule:line-length
       command:
         - -c
         - |
           if [[ -z "${AIRFLOW_UID}" ]]; then
             echo
             echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
             echo "If you are on Linux, you SHOULD follow the instructions 
below to set "
             echo "AIRFLOW_UID environment variable, otherwise files will be 
owned by root."
             echo "For other operating systems you can get rid of the warning 
with manually created .env file:"
             echo "    See: 
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user";
             echo
             export AIRFLOW_UID=$(id -u)
           fi
           one_meg=1048576
           mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / 
one_meg))
           cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
           disk_available=$$(df / | tail -1 | awk '{print $$4}')
           warning_resources="false"
           if (( mem_available < 4000 )) ; then
             echo
             echo -e "\033[1;33mWARNING!!!: Not enough memory available for 
Docker.\e[0m"
             echo "At least 4GB of memory required. You have $$(numfmt --to iec 
$$((mem_available * one_meg)))"
             echo
             warning_resources="true"
           fi
           if (( cpus_available < 2 )); then
             echo
             echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for 
Docker.\e[0m"
             echo "At least 2 CPUs recommended. You have $${cpus_available}"
             echo
             warning_resources="true"
           fi
           if (( disk_available < one_meg * 10 )); then
             echo
             echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for 
Docker.\e[0m"
             echo "At least 10 GBs recommended. You have $$(numfmt --to iec 
$$((disk_available * 1024 )))"
             echo
             warning_resources="true"
           fi
           if [[ $${warning_resources} == "true" ]]; then
             echo
             echo -e "\033[1;33mWARNING!!!: You have not enough resources to 
run Airflow (see above)!\e[0m"
             echo "Please follow the instructions to increase amount of 
resources available:"
             echo "   
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin";
             echo
           fi
           echo
           echo "Creating missing opt dirs if missing:"
           echo
           mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
           echo
           echo "Airflow version:"
           /entrypoint airflow version
           echo
           echo "Files in shared volumes:"
           echo
           ls -la /opt/airflow/{logs,dags,plugins,config}
           echo
           echo "Running airflow config list to create default config file if 
missing."
           echo
           /entrypoint airflow config list >/dev/null
           echo
           echo "Files in shared volumes:"
           echo
           ls -la /opt/airflow/{logs,dags,plugins,config}
           echo
           echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
           echo
           chown -R "${AIRFLOW_UID}:0" /opt/airflow/
           echo
           echo "Change ownership of files in shared volumes to 
${AIRFLOW_UID}:0"
           echo
           chown -v -R "${AIRFLOW_UID}:0" 
/opt/airflow/{logs,dags,plugins,config}
           echo
           echo "Files in shared volumes:"
           echo
           ls -la /opt/airflow/{logs,dags,plugins,config}
   
       # yamllint enable rule:line-length
       environment:
         <<: *airflow-common-env
         _AIRFLOW_DB_MIGRATE: 'true'
         _AIRFLOW_WWW_USER_CREATE: 'true'
         _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
         _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
         _PIP_ADDITIONAL_REQUIREMENTS: 'apache-airflow-providers-apache-kafka 
apache-airflow-providers-git'
       user: "0:0"
   
     airflow-cli:
       <<: *airflow-common
       profiles:
         - debug
       environment:
         <<: *airflow-common-env
         CONNECTION_CHECK_MAX_COUNT: "0"
       # Workaround for entrypoint issue. See: 
https://github.com/apache/airflow/issues/16252
       command:
         - bash
         - -c
         - airflow
       depends_on:
         <<: *airflow-common-depends-on
   
     # You can enable flower by adding "--profile flower" option e.g. 
docker-compose --profile flower up
     # or by explicitly targeted on the command line e.g. docker-compose up 
flower.
     # See: https://docs.docker.com/compose/profiles/
     flower:
       <<: *airflow-common
       command: celery flower
       profiles:
         - flower
       ports:
         - "5555:5555"
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:5555/";]
         interval: 30s
         timeout: 10s
         retries: 5
         start_period: 30s
       restart: always
       depends_on:
         <<: *airflow-common-depends-on
         airflow-init:
           condition: service_completed_successfully
   
   networks:
     default:
       driver: bridge
       driver_opts:
         com.docker.network.bridge.host_binding_ipv4: "127.0.0.1"
   
   volumes:
     postgres-db-volume:
   ```
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to