xBis7 commented on code in PR #68082: URL: https://github.com/apache/airflow/pull/68082#discussion_r3373053448
########## providers/apache/kafka/src/airflow/providers/apache/kafka/plugins/listener.py: ########## @@ -0,0 +1,440 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import atexit +import json +import logging +import os +import time +from datetime import datetime, timezone +from fnmatch import fnmatch +from functools import lru_cache +from typing import TYPE_CHECKING, Any + +from airflow.providers.apache.kafka.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.common.compat.sdk import AirflowPlugin, conf, hookimpl +from airflow.utils.net import get_hostname + +if TYPE_CHECKING: + from confluent_kafka import Producer + from sqlalchemy.orm import Session + + from airflow.models import DagRun, TaskInstance + from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance + from airflow.utils.state import TaskInstanceState + +log = logging.getLogger(__name__) + +CONFIG_SECTION = "kafka_listener" +SCHEMA_VERSION = 1 + + +@lru_cache(maxsize=1) +def _dag_run_events_enabled() -> bool: + return conf.getboolean(CONFIG_SECTION, "dag_run_events_enabled", fallback="False") + + +@lru_cache(maxsize=1) +def _task_instance_events_enabled() -> bool: + return conf.getboolean(CONFIG_SECTION, "task_instance_events_enabled", fallback="False") + + +@lru_cache(maxsize=1) +def _get_topic() -> str: + return conf.get(CONFIG_SECTION, "topic", fallback="airflow.events") + + +@lru_cache(maxsize=1) +def _get_bootstrap_servers() -> str: + return conf.get(CONFIG_SECTION, "bootstrap_servers", fallback="").strip() + + +@lru_cache(maxsize=1) +def _get_source() -> str: + source_from_conf = conf.get(CONFIG_SECTION, "source", fallback="") + # Default to the current hostname, if un-set. + return source_from_conf or get_hostname() + + +def _parse_filter_patterns_to_tuple(raw: str) -> tuple[str, ...]: + return tuple(p.strip() for p in raw.split(",") if p.strip()) + + +@lru_cache(maxsize=1) +def _get_dag_run_dag_id_allowlist() -> tuple[str, ...]: + return _parse_filter_patterns_to_tuple(conf.get(CONFIG_SECTION, "dag_run_dag_id_allowlist", fallback="")) + + +@lru_cache(maxsize=1) +def _get_dag_run_dag_id_denylist() -> tuple[str, ...]: + return _parse_filter_patterns_to_tuple(conf.get(CONFIG_SECTION, "dag_run_dag_id_denylist", fallback="")) + + +@lru_cache(maxsize=1) +def _get_task_instance_dag_id_allowlist() -> tuple[str, ...]: + return _parse_filter_patterns_to_tuple( + conf.get(CONFIG_SECTION, "task_instance_dag_id_allowlist", fallback="") + ) + + +@lru_cache(maxsize=1) +def _get_task_instance_dag_id_denylist() -> tuple[str, ...]: + return _parse_filter_patterns_to_tuple( + conf.get(CONFIG_SECTION, "task_instance_dag_id_denylist", fallback="") + ) + + +@lru_cache(maxsize=1) +def _get_task_instance_task_id_allowlist() -> tuple[str, ...]: + return _parse_filter_patterns_to_tuple( + conf.get(CONFIG_SECTION, "task_instance_task_id_allowlist", fallback="") + ) + + +@lru_cache(maxsize=1) +def _get_task_instance_task_id_denylist() -> tuple[str, ...]: + return _parse_filter_patterns_to_tuple( + conf.get(CONFIG_SECTION, "task_instance_task_id_denylist", fallback="") + ) + + +@lru_cache(maxsize=1) +def _get_topic_check_timeout() -> int: + return conf.getint(CONFIG_SECTION, "topic_check_timeout", fallback="10") + + +@lru_cache(maxsize=1) +def _get_topic_check_retry_interval() -> int: + return conf.getint(CONFIG_SECTION, "topic_check_retry_interval", fallback="60") + + +def _id_is_allowed(id_to_check: str, allowlist: tuple[str, ...], denylist: tuple[str, ...]) -> bool: + """Deny takes precedence; empty allowlist means 'allow all'.""" + if denylist and any(fnmatch(id_to_check, id_pattern) for id_pattern in denylist): + return False + if allowlist and not any(fnmatch(id_to_check, id_pattern) for id_pattern in allowlist): + return False + return True + + +def _dag_run_event_allowed(dag_id: str) -> bool: + return _id_is_allowed( + dag_id, + _get_dag_run_dag_id_allowlist(), + _get_dag_run_dag_id_denylist(), + ) + + +def _task_instance_event_allowed(dag_id: str, task_id: str) -> bool: + return _id_is_allowed( + dag_id, + _get_task_instance_dag_id_allowlist(), + _get_task_instance_dag_id_denylist(), + ) and _id_is_allowed( + task_id, + _get_task_instance_task_id_allowlist(), + _get_task_instance_task_id_denylist(), + ) + + +# confluent_kafka.Producer is not fork-safe — its background threads and +# broker sockets do not survive ``os.fork``. Keep at most one Producer per +# process and reset the cached state in the child so the next call re-inits. +# If the initialization is successful, then the value is cached, +# otherwise init is retried on an interval. +_producer: Producer | None = None +_producer_retry_after: float = 0.0 + + +def _reset_producer_after_fork() -> None: + """Drop any inherited producer in the child; the child re-inits on next call.""" + global _producer, _producer_retry_after + _producer = None + _producer_retry_after = 0.0 + + +os.register_at_fork(after_in_child=_reset_producer_after_fork) + + +def _get_producer() -> Producer | None: + global _producer, _producer_retry_after + # If there is a cached producer, return it, don't re-initialize. + if _producer is not None or time.monotonic() < _producer_retry_after: + return _producer + + result: Producer | None = None + try: + # The plugin is loaded by every Airflow process. The Producer is a heavy dependency, and + # by lazily importing here, we avoid the unneeded startup cost. + from confluent_kafka import Producer + + # No need to check if bootstrap_servers are set, because it's done when registering the listeners. + # If not set, then there won't be any listeners set and this part of the code will never execute. + bootstrap_servers = _get_bootstrap_servers() + producer = Producer({"bootstrap.servers": bootstrap_servers}) Review Comment: Currently, users create an airflow connection and pass `bootstrap_servers` and any other properties to the connection. Then they initialize the Producer or Consumer hook in a dag and pass the connection name -- `kafka_config_id` -- as a param to the constructor. The connection carries all the necessary config. In my initial implementation, I reused the existing hook and added a `kafka_config_id` property in the configuration. But, it felt tiring to always have to create a connection in advance. That's why I didn't use the hook and instead added a `bootstrap_servers` key in the config. Then I noticed that the initialized producer isn't fork safe and we need to handle that as well. So, I reset the producer at process fork, and register an atexit hook that flushes the producer at process exit. Because of your comments I'm revisiting the approach. I'm thinking about moving some of the config options and the forking changes under the hook itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
