This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d05a47def9 FIX add error_cb to `confluent.Consumer` config in 
`ConsumerFromTopic` (#44307)
7d05a47def9 is described below

commit 7d05a47def9c8f84c7b558cead88da9fa90d9552
Author: Success Moses <[email protected]>
AuthorDate: Sun Dec 8 23:17:22 2024 +0100

    FIX add error_cb to `confluent.Consumer` config in `ConsumerFromTopic` 
(#44307)
---
 .../connections/kafka.rst                          |  6 ++++--
 .../hooks.rst                                      | 14 +++++++++++++
 .../providers/apache/kafka/hooks/consume.py        | 23 ++++++++++++++++++++--
 3 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst 
b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
index 3e570506b64..c1803f0b21c 100644
--- a/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
+++ b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst
@@ -35,8 +35,10 @@ Kafka hooks and operators use ``kafka_default`` by default, 
this connection is v
 Configuring the Connection
 --------------------------
 
-Connections are configured as a json serializable string provided to the 
``extra`` field. A full list of parameters
-are described in the `Confluent Kafka python library 
<https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_.
+Connections are configured as a json serializable string provided to the 
``extra`` field. The ``error_cb`` parameter can be
+used to specify a callback function by providing a path to the function. e.g 
``"module.callback_func"``. A full list
+of parameters are described in the
+`Confluent Kafka python library 
<https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_.
 
 If you are defining the Airflow connection from the Airflow UI, the ``extra`` 
field will be renamed to ``Config Dict``.
 
diff --git a/docs/apache-airflow-providers-apache-kafka/hooks.rst 
b/docs/apache-airflow-providers-apache-kafka/hooks.rst
index 7737d4fdb55..6d80d30ad4b 100644
--- a/docs/apache-airflow-providers-apache-kafka/hooks.rst
+++ b/docs/apache-airflow-providers-apache-kafka/hooks.rst
@@ -42,6 +42,20 @@ Reference
 For further information, look at `Apache Kafka Admin config documentation 
<https://kafka.apache.org/documentation/#adminclientconfigs>`_.
 
 
+.. _howto/hook:KafkaAuthenticationError:
+
+KafkaAuthenticationError
+------------------------
+
+Custom exception for Kafka authentication failures.
+
+Reference
+"""""""""
+
+For further information, look at
+`Confluent Kafka Documentation 
<https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-kafkaerror>`_.
+
+
 .. _howto/hook:KafkaConsumerHook:
 
 KafkaConsumerHook
diff --git a/providers/src/airflow/providers/apache/kafka/hooks/consume.py 
b/providers/src/airflow/providers/apache/kafka/hooks/consume.py
index 014798910d0..67d594ae7c8 100644
--- a/providers/src/airflow/providers/apache/kafka/hooks/consume.py
+++ b/providers/src/airflow/providers/apache/kafka/hooks/consume.py
@@ -18,9 +18,23 @@ from __future__ import annotations
 
 from collections.abc import Sequence
 
-from confluent_kafka import Consumer
+from confluent_kafka import Consumer, KafkaError
 
 from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
+from airflow.utils.module_loading import import_string
+
+
+class KafkaAuthenticationError(Exception):
+    """Custom exception for Kafka authentication failures."""
+
+    pass
+
+
+def error_callback(err):
+    """Handle kafka errors."""
+    if err.code() == KafkaError._AUTHENTICATION:
+        raise KafkaAuthenticationError(f"Authentication failed: {err}")
+    print("Exception received: ", err)
 
 
 class KafkaConsumerHook(KafkaBaseHook):
@@ -36,7 +50,12 @@ class KafkaConsumerHook(KafkaBaseHook):
         self.topics = topics
 
     def _get_client(self, config) -> Consumer:
-        return Consumer(config)
+        config_shallow = config.copy()
+        if config.get("error_cb") is None:
+            config_shallow["error_cb"] = error_callback
+        else:
+            config_shallow["error_cb"] = import_string(config["error_cb"])
+        return Consumer(config_shallow)
 
     def get_consumer(self) -> Consumer:
         """Return a Consumer that has been subscribed to topics."""

Reply via email to