MaksYermak commented on code in PR #47563:
URL: https://github.com/apache/airflow/pull/47563#discussion_r1987734957
##########
providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/base.py:
##########
@@ -70,6 +69,15 @@ def get_conn(self) -> Any:
and bootstrap_servers.find("cloud.goog") != -1
and bootstrap_servers.find("managedkafka") != -1
):
+ try:
+ from airflow.providers.google.cloud.hooks.managed_kafka import
ManagedKafkaHook
+ except ImportError:
+ from airflow.exceptions import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import ManagedKafkaHook. For using this
functionality google provider version "
+ ">= 14.1.0 should be pre-installed."
+ )
Review Comment:
`KafkaBaseHook` is the hook inside `apache-kafka` provider which is using
for operators from `apache-kafka` provider. Also, this hook contains the
`get_conn` function which is needed for establishing connections to the Kafka
cluster.
`ManagedKafkaHook` is the hook from Google provider. Which is used for
operators for Managed Kafka service inside Google Cloud. These operators can
create a Kafka cluster and manage it. They can't push/pull data to this cluster.
For pushing and pulling data users need to use operators from the
`apache-kafka` provider. For doing it firstly operators established a
connection to the Cluster. The connection for pure Kafka clusters and for Kafka
clusters inside Google infrastructure is a little bit different. And it's a
reason why I updated the code for `get_conn` and imported the
`ManagedKafkaHook`.
For the Kafka cluster managed by google we need to pass a function which
generates a token for connection with google's credentials. And a function
which gets a google's credential exists only inside the google provider. For
the `ManagedKafkaHook` I have added the logic for token generation and then to
the `KafkaBaseHook` I added additional logic for `get_conn`. This logic work
only for clusters managed by google [here the
code](https://github.com/apache/airflow/blob/main/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/base.py#L67C9-L79C1)
I think that my mistake was that I included an import not under the
`if-statement` with this additional logic which is needed only for clusters
managed by google. It created a hard binding between `google` and
`apache-kafka` providers. By this PR I moved import under the `if-statement`
and, I hope, it will make a `google` provider as an optional dependence for the
`apache-kafka` provider.
@eladkal am I right understand that in the current time `google` provider is
a obligatory dependency for `apache-kafka` provider and for us it is not good
and we need to remove this dependency?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]