DinGo4DEV opened a new issue, #51809: URL: https://github.com/apache/airflow/issues/51809
### Apache Airflow version 3.0.2 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? ## What Happened When below trigger was started and there was a message event arrived, the trigger job running occurred error with unexpected arguments. ``` python def apply_function(empty,message,**kwargs): val = json.loads(message.value()) print(f"Value in message is {val}") return True trigger = MessageQueueTrigger( queue="kafka://localhost:9092/test", apply_function="kafka_example_dag.apply_function", kafka_config_id="kafka_t1" ) ``` ## Error ``` python Trigger ID 12 exited with error apply_function() got an unexpected keyword argument '__var' [airflow.jobs.triggerer_job_runner] error_detail=[{'exc_type': 'TypeError', 'exc_value': "apply_function() got an unexpected keyword argument '__var'", 'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames': [ {'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 901, 'name': 'cleanup_finished_triggers'}, {'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 1014, 'name': 'run_trigger'}, {'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/kafka/triggers/await_message.py', 'lineno': 111, 'name': 'run'}, {'filename': '/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py', 'lineno': 468, 'name': '__call__'}, {'filename': '/usr/local/lib/python3.12/concurrent/futures/thread.py', 'lineno': 59, 'name': 'run'}, {'filename': '/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py', 'lineno': 522, 'name': 'thread_handler'}], 'is_group': False, 'exceptions': []}] ``` ### What you think should happen instead? As the 3.0.1 Version is working fine, so I printout the serialized event trigger object. ``` python ## use above trigger from airflow.serialization import serialized_objects serialized_objects._encode_trigger(trigger) """ Airflow 3.0.2 outputs: {'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger', 'kwargs': {'topics': ['test_topic'], 'apply_function': 'test.noop', 'apply_function_args': {<Encoding.VAR: '__var'>: [], <Encoding.TYPE: '__type'>: <DagAttributeTypes.TUPLE: 'tuple'>}, 'apply_function_kwargs': {<Encoding.VAR: '__var'>: {}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'kafka_config_id': 'kafka_default', 'poll_timeout': 1, 'poll_interval': 5}} """ """ Airflow 3.0.1 outputs: {'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger', 'kwargs': {'topics': ['test_topic'], 'apply_function': 'test.noop', 'apply_function_args': (), 'apply_function_kwargs': {}, 'kafka_config_id': 'kafka_default', 'poll_timeout': 1, 'poll_interval': 5}} """ from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger trigger = AwaitMessageTrigger( topics=["test_topic"], apply_function="test.noop", kafka_config_id="kafka_default", ) serialized_objects._encode_trigger(trigger) """ Airflow 3.0.1 & 3.0.2 outputs: {'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger', 'kwargs': {'topics': ['test_topic'], 'apply_function': 'test.noop', 'apply_function_args': (), 'apply_function_kwargs': {}, 'kafka_config_id': 'kafka_default', 'poll_timeout': 1, 'poll_interval': 5}} """ ``` ### How to reproduce 1. Create `kafka_example_dag.py` 2. Create connection for `kafka_t1` 3. Add below code and send a message in Kafka topic `test` ``` python import json from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger from airflow.providers.standard.operators.empty import EmptyOperator from airflow.sdk import dag, Asset, AssetWatcher def apply_function(empty,message,**kwargs): val = json.loads(message.value()) print(f"Value in message is {val}") return True trigger = MessageQueueTrigger( queue="kafka://localhost:9092/test", apply_function="kafka_example_dag.apply_function", kafka_config_id="kafka_t1" ) asset = Asset("kafka_queue_asset_2", watchers=[AssetWatcher(name="kafka_watcher_2", trigger=trigger)]) @dag(dag_id="example_kafka_watcher_2", schedule=[asset]) def example_kafka_watcher(): EmptyOperator(task_id="task") example_kafka_watcher() ``` ### Operating System Debian GNU/Linux 12 (bookworm) ### Versions of Apache Airflow Providers ``` apache-airflow-providers-apache-kafka==1.9.0 apache-airflow-providers-common-messaging==1.0.2 ``` ### Deployment Docker-Compose ### Deployment details ``` dockerfile x-airflow-common: &airflow-common # In order to add custom dependencies or upgrade provider distributions you can use your extended image. # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml # and uncomment the "build" line below, Then run `docker-compose build` to build the images. image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.2} # build: . environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/' AIRFLOW_CONN_KAFKA_DEFAULT: 'kafka://kafka:29092' PYTHONPATH: /opt/airflow/dags:$PYTHONPATH # yamllint disable rule:line-length # Use simple http server on scheduler for health checks # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server # yamllint enable rule:line-length AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. _PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-apache-kafka apache-airflow-providers-git # The following line can be used to set a custom config file, stored in the local config folder AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on redis: condition: service_healthy postgres: condition: service_healthy kafka: condition: service_healthy services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow ports: - "5432:5432" volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s retries: 5 start_period: 5s restart: always redis: # Redis is limited to 7.2-bookworm due to licencing change # https://redis.io/blog/redis-adopts-dual-source-available-licensing/ image: redis:7.2-bookworm expose: - 6379 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 30s retries: 50 start_period: 30s restart: always kafbat-ui: container_name: kafbat-ui image: ghcr.io/kafbat/kafka-ui:latest ports: - 8082:8080 environment: DYNAMIC_CONFIG_ENABLED: 'true' KAFKA_CLUSTERS_0_NAME: airflow_test KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 depends_on: kafka: condition: service_healthy kafka: image: confluentinc/cp-kafka:latest hostname: kafka user: "1000" # Run as non-root user expose: - 9092 - 29092 - 29093 ports: - "9092:9092" - "29093:29093" # Controller port - "29092:29092" # Broker port environment: KAFKA_KRAFT_MODE: "true" # Enables KRaft mode. KAFKA_PROCESS_ROLES: broker,controller # Kafka acts as both controller and broker. KAFKA_NODE_ID: 1 # Unique ID for the Kafka instance. KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093" # Controller quorum. KAFKA_LISTENERS: PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://kafka:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" # Enables automatic topic creation. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # Single replica for simplicity. KAFKA_LOG_RETENTION_HOURS: 168 # Log retention period (7 days). KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 # No rebalance delay. CLUSTER_ID: "c31d0fa15a5b483bbdf1edd4c8017394" # Unique Kafka cluster ID. healthcheck: test: ["CMD", "kafka-topics", "--bootstrap-server", "kafka:9092", "--list"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always volumes: - ./kafka-data:/var/lib/kafka/data airflow-apiserver: <<: *airflow-common command: api-server ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-scheduler: <<: *airflow-common command: scheduler healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-dag-processor: <<: *airflow-common command: dag-processor healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"'] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-worker: <<: *airflow-common command: celery worker healthcheck: # yamllint disable rule:line-length test: - "CMD-SHELL" - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 30s timeout: 10s retries: 5 start_period: 30s environment: <<: *airflow-common-env # Required to handle warm shutdown of the celery workers properly # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation DUMB_INIT_SETSID: "0" restart: always depends_on: <<: *airflow-common-depends-on airflow-apiserver: condition: service_healthy airflow-init: condition: service_completed_successfully airflow-triggerer: <<: *airflow-common command: triggerer healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-init: <<: *airflow-common entrypoint: /bin/bash # yamllint disable rule:line-length command: - -c - | if [[ -z "${AIRFLOW_UID}" ]]; then echo echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" echo "If you are on Linux, you SHOULD follow the instructions below to set " echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." echo "For other operating systems you can get rid of the warning with manually created .env file:" echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" echo export AIRFLOW_UID=$(id -u) fi one_meg=1048576 mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) disk_available=$$(df / | tail -1 | awk '{print $$4}') warning_resources="false" if (( mem_available < 4000 )) ; then echo echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" echo warning_resources="true" fi if (( cpus_available < 2 )); then echo echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" echo "At least 2 CPUs recommended. You have $${cpus_available}" echo warning_resources="true" fi if (( disk_available < one_meg * 10 )); then echo echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" echo warning_resources="true" fi if [[ $${warning_resources} == "true" ]]; then echo echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" echo "Please follow the instructions to increase amount of resources available:" echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" echo fi echo echo "Creating missing opt dirs if missing:" echo mkdir -v -p /opt/airflow/{logs,dags,plugins,config} echo echo "Airflow version:" /entrypoint airflow version echo echo "Files in shared volumes:" echo ls -la /opt/airflow/{logs,dags,plugins,config} echo echo "Running airflow config list to create default config file if missing." echo /entrypoint airflow config list >/dev/null echo echo "Files in shared volumes:" echo ls -la /opt/airflow/{logs,dags,plugins,config} echo echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0" echo chown -R "${AIRFLOW_UID}:0" /opt/airflow/ echo echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0" echo chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config} echo echo "Files in shared volumes:" echo ls -la /opt/airflow/{logs,dags,plugins,config} # yamllint enable rule:line-length environment: <<: *airflow-common-env _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} _PIP_ADDITIONAL_REQUIREMENTS: 'apache-airflow-providers-apache-kafka apache-airflow-providers-git' user: "0:0" airflow-cli: <<: *airflow-common profiles: - debug environment: <<: *airflow-common-env CONNECTION_CHECK_MAX_COUNT: "0" # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 command: - bash - -c - airflow depends_on: <<: *airflow-common-depends-on # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up # or by explicitly targeted on the command line e.g. docker-compose up flower. # See: https://docs.docker.com/compose/profiles/ flower: <<: *airflow-common command: celery flower profiles: - flower ports: - "5555:5555" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:5555/"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully networks: default: driver: bridge driver_opts: com.docker.network.bridge.host_binding_ipv4: "127.0.0.1" volumes: postgres-db-volume: ``` ### Anything else? _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org