MaksYermak commented on code in PR #47563:
URL: https://github.com/apache/airflow/pull/47563#discussion_r1987734957


##########
providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/base.py:
##########
@@ -70,6 +69,15 @@ def get_conn(self) -> Any:
             and bootstrap_servers.find("cloud.goog") != -1
             and bootstrap_servers.find("managedkafka") != -1
         ):
+            try:
+                from airflow.providers.google.cloud.hooks.managed_kafka import 
ManagedKafkaHook
+            except ImportError:
+                from airflow.exceptions import 
AirflowOptionalProviderFeatureException
+
+                raise AirflowOptionalProviderFeatureException(
+                    "Failed to import ManagedKafkaHook. For using this 
functionality google provider version "
+                    ">= 14.1.0 should be pre-installed."
+                )

Review Comment:
   `KafkaBaseHook` is the hook inside `apache-kafka` provider which is using 
for operators from `apache-kafka` provider. Also, this hook contains the 
`get_conn` function which is needed for establishing connections to the Kafka 
cluster.   
   
   `ManagedKafkaHook` is the hook from Google provider.  Which is used for 
operators for Managed Kafka service inside Google Cloud. These operators can 
create a Kafka cluster and manage it. They can't push/pull data to this cluster.
   
   For pushing and pulling data users need to use operators from the 
`apache-kafka` provider. For doing it firstly operators established a 
connection to the Cluster. The connection for pure Kafka clusters and for Kafka 
clusters inside Google infrastructure is a little bit different. And it's a 
reason why I updated the code for `get_conn` and imported the 
`ManagedKafkaHook`.
   
   For the Kafka cluster managed by google we need to pass a function which 
generates a token for connection with google's credentials. And a function 
which gets a google's credential exists only inside the google provider. For 
the `ManagedKafkaHook` I have added the logic for token generation and then to 
the `KafkaBaseHook` I added additional logic for `get_conn`. This logic work 
only for clusters managed by google [here the 
code](https://github.com/apache/airflow/blob/main/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/base.py#L67C9-L79C1)
   
   I think that my mistake was that I included an import not under the 
`if-statement` with this additional logic which is needed only for clusters 
managed by google. It created a hard binding between `google` and 
`apache-kafka` providers. By this PR I moved import under the `if-statement` 
and, I hope, it will make a `google` provider as an optional dependence for the 
`apache-kafka` provider. 
   
   @eladkal am I right understand that in the current time `google` provider is 
a obligatory dependency for `apache-kafka` provider and for us it is not good 
and we need to remove this dependency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to