This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7d05a47def9 FIX add error_cb to `confluent.Consumer` config in
`ConsumerFromTopic` (#44307)
7d05a47def9 is described below
commit 7d05a47def9c8f84c7b558cead88da9fa90d9552
Author: Success Moses <[email protected]>
AuthorDate: Sun Dec 8 23:17:22 2024 +0100
FIX add error_cb to `confluent.Consumer` config in `ConsumerFromTopic`
(#44307)
---
.../connections/kafka.rst | 6 ++++--
.../hooks.rst | 14 +++++++++++++
.../providers/apache/kafka/hooks/consume.py | 23 ++++++++++++++++++++--
3 files changed, 39 insertions(+), 4 deletions(-)
diff --git a/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
index 3e570506b64..c1803f0b21c 100644
--- a/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
+++ b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
@@ -35,8 +35,10 @@ Kafka hooks and operators use ``kafka_default`` by default,
this connection is v
Configuring the Connection
--------------------------
-Connections are configured as a json serializable string provided to the
``extra`` field. A full list of parameters
-are described in the `Confluent Kafka python library
<https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_.
+Connections are configured as a json serializable string provided to the
``extra`` field. The ``error_cb`` parameter can be
+used to specify a callback function by providing a path to the function. e.g
``"module.callback_func"``. A full list
+of parameters are described in the
+`Confluent Kafka python library
<https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_.
If you are defining the Airflow connection from the Airflow UI, the ``extra``
field will be renamed to ``Config Dict``.
diff --git a/docs/apache-airflow-providers-apache-kafka/hooks.rst
b/docs/apache-airflow-providers-apache-kafka/hooks.rst
index 7737d4fdb55..6d80d30ad4b 100644
--- a/docs/apache-airflow-providers-apache-kafka/hooks.rst
+++ b/docs/apache-airflow-providers-apache-kafka/hooks.rst
@@ -42,6 +42,20 @@ Reference
For further information, look at `Apache Kafka Admin config documentation
<https://kafka.apache.org/documentation/#adminclientconfigs>`_.
+.. _howto/hook:KafkaAuthenticationError:
+
+KafkaAuthenticationError
+------------------------
+
+Custom exception for Kafka authentication failures.
+
+Reference
+"""""""""
+
+For further information, look at
+`Confluent Kafka Documentation
<https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-kafkaerror>`_.
+
+
.. _howto/hook:KafkaConsumerHook:
KafkaConsumerHook
diff --git a/providers/src/airflow/providers/apache/kafka/hooks/consume.py
b/providers/src/airflow/providers/apache/kafka/hooks/consume.py
index 014798910d0..67d594ae7c8 100644
--- a/providers/src/airflow/providers/apache/kafka/hooks/consume.py
+++ b/providers/src/airflow/providers/apache/kafka/hooks/consume.py
@@ -18,9 +18,23 @@ from __future__ import annotations
from collections.abc import Sequence
-from confluent_kafka import Consumer
+from confluent_kafka import Consumer, KafkaError
from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
+from airflow.utils.module_loading import import_string
+
+
+class KafkaAuthenticationError(Exception):
+ """Custom exception for Kafka authentication failures."""
+
+ pass
+
+
+def error_callback(err):
+ """Handle kafka errors."""
+ if err.code() == KafkaError._AUTHENTICATION:
+ raise KafkaAuthenticationError(f"Authentication failed: {err}")
+ print("Exception received: ", err)
class KafkaConsumerHook(KafkaBaseHook):
@@ -36,7 +50,12 @@ class KafkaConsumerHook(KafkaBaseHook):
self.topics = topics
def _get_client(self, config) -> Consumer:
- return Consumer(config)
+ config_shallow = config.copy()
+ if config.get("error_cb") is None:
+ config_shallow["error_cb"] = error_callback
+ else:
+ config_shallow["error_cb"] = import_string(config["error_cb"])
+ return Consumer(config_shallow)
def get_consumer(self) -> Consumer:
"""Return a Consumer that has been subscribed to topics."""