This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f575da4a80d Fix KafkaError.name() called as property instead of method
in create_topic (#65734)
f575da4a80d is described below
commit f575da4a80dfc1bdd6a421f3421f550f79020c9a
Author: Park Jiwon <[email protected]>
AuthorDate: Fri May 15 02:58:42 2026 +0900
Fix KafkaError.name() called as property instead of method in create_topic
(#65734)
---
.../kafka/src/airflow/providers/apache/kafka/hooks/client.py | 2 +-
.../apache/kafka/tests/unit/apache/kafka/hooks/test_client.py | 11 +++++------
2 files changed, 6 insertions(+), 7 deletions(-)
diff --git
a/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/client.py
b/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/client.py
index 8df77604737..b26383da5e1 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/client.py
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/client.py
@@ -56,7 +56,7 @@ class KafkaAdminClientHook(KafkaBaseHook):
f.result()
self.log.info("The topic %s has been created.", t)
except KafkaException as e:
- if e.args[0].name == "TOPIC_ALREADY_EXISTS":
+ if e.args[0].name() == "TOPIC_ALREADY_EXISTS":
self.log.warning("The topic %s already exists.", t)
else:
raise
diff --git
a/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_client.py
b/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_client.py
index d56bc17b207..5e3ea7189a6 100644
--- a/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_client.py
+++ b/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_client.py
@@ -85,19 +85,18 @@ class TestKafkaAdminClientHook:
@patch(
"airflow.providers.apache.kafka.hooks.base.AdminClient",
)
- def test_create_topic_warning(self, admin_client, caplog):
+ def test_create_topic_already_exists_no_exception_but_warning(self,
admin_client):
mock_f = MagicMock()
kafka_exception = KafkaException()
mock_arg = MagicMock()
- mock_arg.name = "TOPIC_ALREADY_EXISTS"
+ mock_arg.name.return_value = "TOPIC_ALREADY_EXISTS"
kafka_exception.args = [mock_arg]
mock_f.result.side_effect = [kafka_exception]
admin_client.return_value.create_topics.return_value = {"topic_name":
mock_f}
- with caplog.at_level(
- logging.WARNING,
logger="airflow.providers.apache.kafka.hooks.client.KafkaAdminClientHook"
- ):
+
+ with patch.object(self.hook.log, "warning") as mock_warning:
self.hook.create_topic(topics=[("topic_name", 0, 1)])
- assert "The topic topic_name already exists" in caplog.text
+ mock_warning.assert_called_once_with("The topic %s already
exists.", "topic_name")
@patch(
"airflow.providers.apache.kafka.hooks.base.AdminClient",