shahar1 commented on code in PR #38673:
URL: https://github.com/apache/airflow/pull/38673#discussion_r1558097913


##########
airflow/providers/google/cloud/hooks/vertex_ai/prediction_service.py:
##########
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Sequence
+
+from google.api_core.client_options import ClientOptions
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+from google.cloud.aiplatform_v1 import PredictionServiceClient
+
+from airflow.providers.google.common.consts import CLIENT_INFO
+from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID, GoogleBaseHook
+
+if TYPE_CHECKING:
+    from google.api_core.retry import Retry
+    from google.cloud.aiplatform_v1.types import PredictResponse
+
+
+class PredictionServiceHook(GoogleBaseHook):
+    """Hook for Google Cloud Vertex AI Prediction API."""
+
+    def get_prediction_service_client(self, region: str | None = None) -> 
PredictionServiceClient:
+        """Return PredictionServiceClient object."""
+        if region and region != "global":
+            client_options = 
ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com:443")
+        else:
+            client_options = ClientOptions()
+
+        return PredictionServiceClient(
+            credentials=self.get_credentials(), client_info=CLIENT_INFO, 
client_options=client_options
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def predict(
+        self,
+        endpoint_id: str,
+        instances: list[str],
+        location: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        params: dict[str, str] | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> PredictResponse:
+        """
+        Perform an online prediction and returns the prediction result in the 
response.
+
+        :param endpoint_id: Name of the endpoint_id requested to serve the 
prediction.
+        :param instances: Required. The instances that are the input to the 
prediction call. A DeployedModel
+            may have an upper limit on the number of instances it supports per 
request, and when it is
+            exceeded the prediction call errors in case of AutoML Models, or, 
in case of customer created
+            Models, the behaviour is as documented by that Model.
+        :param params: Additional domain-specific parameters, any string must 
be up to 25000 characters long.
+        :param project_id: ID of the Google Cloud project where model is 
located if None then
+            default project_id is used.
+        :param location: The location of the project.
+        :param retry: A retry object used to retry requests. If `None` is 
specified, requests will not be
+            retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete. Note that if
+            `retry` is specified, the timeout applies to each individual 
attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_prediction_service_client(location)
+        endpoint = 
f"projects/{project_id}/locations/{location}/endpoints/{endpoint_id}"
+        return client.predict(
+            request={"endpoint": endpoint, "instances": instances, "params": 
params},

Review Comment:
   Could you please recheck the `params` argument?
   According to the API, if I'm not wrong, it should be named `parameters`.



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -230,11 +261,40 @@ def __init__(
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Context):
-        hook = CloudAutoMLHook(
-            gcp_conn_id=self.gcp_conn_id,
-            impersonation_chain=self.impersonation_chain,
+    @cached_property
+    def hook(self) -> CloudAutoMLHook | PredictionServiceHook:
+        if self.model_id:
+            return CloudAutoMLHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )
+        else:  # endpoint_id defined
+            return PredictionServiceHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )

Review Comment:
   Please add tests for this `if`'s causes



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -429,6 +519,8 @@ def __init__(
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: Context):
+        if "translation_dataset_metadata" in self.dataset:
+            _raise_exception_for_deprecated_operator(self.__class__.__name__, 
"CreateDatasetOperator")

Review Comment:
   Please add a test for this `if` (and similar `if`s)



##########
airflow/providers/google/cloud/hooks/vertex_ai/prediction_service.py:
##########
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Sequence
+
+from google.api_core.client_options import ClientOptions
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+from google.cloud.aiplatform_v1 import PredictionServiceClient
+
+from airflow.providers.google.common.consts import CLIENT_INFO
+from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID, GoogleBaseHook
+
+if TYPE_CHECKING:
+    from google.api_core.retry import Retry
+    from google.cloud.aiplatform_v1.types import PredictResponse
+
+
+class PredictionServiceHook(GoogleBaseHook):
+    """Hook for Google Cloud Vertex AI Prediction API."""
+
+    def get_prediction_service_client(self, region: str | None = None) -> 
PredictionServiceClient:
+        """Return PredictionServiceClient object."""
+        if region and region != "global":
+            client_options = 
ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com:443")
+        else:
+            client_options = ClientOptions()
+
+        return PredictionServiceClient(
+            credentials=self.get_credentials(), client_info=CLIENT_INFO, 
client_options=client_options
+        )

Review Comment:
   Same as my previous comment :)



##########
airflow/providers/google/cloud/hooks/automl.py:
##########
@@ -640,3 +640,22 @@ def delete_dataset(
             metadata=metadata,
         )
         return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_dataset(
+        self,
+        dataset_id: str,
+        location: str,
+        project_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Dataset:

Review Comment:
   Could you please add a docstring and tests for this method?



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -52,12 +55,36 @@
 MetaData = Sequence[Tuple[str, str]]
 
 
+def _raise_exception_for_deprecated_operator(
+    deprecated_class_name: str, alternative_class_names: str | list[str]
+):
+    if isinstance(alternative_class_names, str):
+        alternative_class_name_str = alternative_class_names
+    elif len(alternative_class_names) == 1:
+        alternative_class_name_str = alternative_class_names[0]
+    else:
+        alternative_class_name_str = ", ".join(f"`{cls_name}`" for cls_name in 
alternative_class_names[:-1])
+        alternative_class_name_str += f" or `{alternative_class_names[-1]}`"
+
+    raise AirflowProviderDeprecationWarning(
+        f"{deprecated_class_name} for text, image and video prediction is "
+        f"deprecated and will be removed after 31.03.2024. All the 
functionality of "
+        f"legacy AutoML Natural Language, Vision, Video Intelligence and 
Tables "
+        f"and new features are available on the Vertex AI platform. "
+        f"Please use {alternative_class_name_str} from VertexAI."

Review Comment:
   ```suggestion
           f"Please use {alternative_class_name_str} from Vertex AI."
   ```



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -52,12 +55,36 @@
 MetaData = Sequence[Tuple[str, str]]
 
 
+def _raise_exception_for_deprecated_operator(
+    deprecated_class_name: str, alternative_class_names: str | list[str]
+):
+    if isinstance(alternative_class_names, str):
+        alternative_class_name_str = alternative_class_names
+    elif len(alternative_class_names) == 1:
+        alternative_class_name_str = alternative_class_names[0]
+    else:
+        alternative_class_name_str = ", ".join(f"`{cls_name}`" for cls_name in 
alternative_class_names[:-1])
+        alternative_class_name_str += f" or `{alternative_class_names[-1]}`"
+
+    raise AirflowProviderDeprecationWarning(
+        f"{deprecated_class_name} for text, image and video prediction is "
+        f"deprecated and will be removed after 31.03.2024. All the 
functionality of "
+        f"legacy AutoML Natural Language, Vision, Video Intelligence and 
Tables "
+        f"and new features are available on the Vertex AI platform. "
+        f"Please use {alternative_class_name_str} from VertexAI."
+    )

Review Comment:
   Following @Taragolis 's comment, you should either `raise 
AirflowException("...")` or `warnings.warn("...", 
AirflowProviderDeprecationWarning)` - as these are the conventions for applying 
exceptions and warnings, respectively.



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -52,12 +55,36 @@
 MetaData = Sequence[Tuple[str, str]]
 
 
+def _raise_exception_for_deprecated_operator(
+    deprecated_class_name: str, alternative_class_names: str | list[str]
+):
+    if isinstance(alternative_class_names, str):
+        alternative_class_name_str = alternative_class_names
+    elif len(alternative_class_names) == 1:
+        alternative_class_name_str = alternative_class_names[0]
+    else:
+        alternative_class_name_str = ", ".join(f"`{cls_name}`" for cls_name in 
alternative_class_names[:-1])
+        alternative_class_name_str += f" or `{alternative_class_names[-1]}`"
+
+    raise AirflowProviderDeprecationWarning(
+        f"{deprecated_class_name} for text, image and video prediction is "
+        f"deprecated and will be removed after 31.03.2024. All the 
functionality of "

Review Comment:
   Please update similar warnings as well, as it has already been removed by 
GCP:
   
   ```suggestion
           f"{deprecated_class_name} for text, image, and video prediction has 
been "
           f"deprecated and no longer available. All the functionality of "
   ```



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -230,11 +261,40 @@ def __init__(
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Context):
-        hook = CloudAutoMLHook(
-            gcp_conn_id=self.gcp_conn_id,
-            impersonation_chain=self.impersonation_chain,
+    @cached_property
+    def hook(self) -> CloudAutoMLHook | PredictionServiceHook:
+        if self.model_id:
+            return CloudAutoMLHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )
+        else:  # endpoint_id defined
+            return PredictionServiceHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )
+
+    def _check_model_type(self):
+        hook: CloudAutoMLHook = self.hook
+        model: Model = hook.get_model(
+            model_id=self.model_id,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
         )
+        if not hasattr(model, "translation_model_metadata"):
+            raise AirflowException(
+                "AutoMLPredictOperator for text, image and video prediction is 
"
+                "deprecated. Please use endpoint_id param instead of model_id 
param."
+            )
+
+    def execute(self, context: Context):
+        if self.model_id:
+            self._check_model_type()
+
+        hook: CloudAutoMLHook | PredictionServiceHook = self.hook

Review Comment:
   ```suggestion
           hook = self.hook
   ```



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -205,7 +231,8 @@ class AutoMLPredictOperator(GoogleCloudBaseOperator):
     def __init__(
         self,
         *,
-        model_id: str,
+        model_id: str | None = None,
+        endpoint_id: str | None = None,

Review Comment:
   Please add a docstring for this new argument



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -219,7 +246,11 @@ def __init__(
     ) -> None:
         super().__init__(**kwargs)
 
+        if model_id is None and endpoint_id is None:
+            raise AirflowException("You must specify model_id or endpoint_id!")

Review Comment:
   1. `model_id` is a template field; related value checks shouldn't be here - 
but in the beginning of the `execute()` method. Tou could relocate this `if` 
there, while referring to `model_Id` as `self.model_id` and `endpoint_id` as 
`self.endpoint_id`.
   2. Please add a test for this `if` after relocating it.



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -230,11 +261,40 @@ def __init__(
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Context):
-        hook = CloudAutoMLHook(
-            gcp_conn_id=self.gcp_conn_id,
-            impersonation_chain=self.impersonation_chain,
+    @cached_property
+    def hook(self) -> CloudAutoMLHook | PredictionServiceHook:
+        if self.model_id:
+            return CloudAutoMLHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )
+        else:  # endpoint_id defined
+            return PredictionServiceHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )
+
+    def _check_model_type(self):
+        hook: CloudAutoMLHook = self.hook
+        model: Model = hook.get_model(
+            model_id=self.model_id,
+            location=self.location,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
         )
+        if not hasattr(model, "translation_model_metadata"):
+            raise AirflowException(
+                "AutoMLPredictOperator for text, image and video prediction is 
"
+                "deprecated. Please use endpoint_id param instead of model_id 
param."
+            )

Review Comment:
   Could you please add a test for this exception?



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -230,11 +261,40 @@ def __init__(
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Context):
-        hook = CloudAutoMLHook(
-            gcp_conn_id=self.gcp_conn_id,
-            impersonation_chain=self.impersonation_chain,
+    @cached_property
+    def hook(self) -> CloudAutoMLHook | PredictionServiceHook:
+        if self.model_id:
+            return CloudAutoMLHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )
+        else:  # endpoint_id defined
+            return PredictionServiceHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+            )
+
+    def _check_model_type(self):
+        hook: CloudAutoMLHook = self.hook
+        model: Model = hook.get_model(

Review Comment:
   Why don't you trust the return type hints given in the methods' signature?
   
   ```suggestion
       def _check_model_type(self):
           hook = self.hook
           model = hook.get_model(
   ```



##########
tests/providers/google/cloud/operators/test_automl.py:
##########
@@ -324,41 +350,38 @@ def test_execute(self, mock_hook):
         dataset = copy.deepcopy(DATASET)
         dataset["name"] = DATASET_ID
 
-        op = AutoMLTablesUpdateDatasetOperator(
-            dataset=dataset,
-            update_mask=MASK,
-            location=GCP_LOCATION,
-            task_id=TASK_ID,
-        )
-        op.execute(context=mock.MagicMock())
-        mock_hook.return_value.update_dataset.assert_called_once_with(
-            dataset=dataset,
-            metadata=(),
-            retry=DEFAULT,
-            timeout=None,
-            update_mask=MASK,
+        with pytest.raises(AirflowProviderDeprecationWarning) as err:
+            AutoMLTablesUpdateDatasetOperator(
+                dataset=dataset,
+                update_mask=MASK,
+                location=GCP_LOCATION,
+                task_id=TASK_ID,
+            )
+        assert str(err.value).startswith(
+            "Call to deprecated class AutoMLTablesUpdateDatasetOperator. "
+            "(Class `AutoMLTablesUpdateDatasetOperator` is deprecated and will 
be removed after"

Review Comment:
   It would be nicer to use the `match` keyword instead:
   https://docs.pytest.org/en/7.1.x/how-to/assert.html
   
   You are welcome to apply it to similar assertions :)



##########
airflow/providers/google/cloud/operators/automl.py:
##########
@@ -246,7 +306,7 @@ def execute(self, context: Context):
             metadata=self.metadata,
         )
         project_id = self.project_id or hook.project_id
-        if project_id:
+        if project_id and self.model_id:

Review Comment:
   Why is `self.model_id` now required?



##########
tests/providers/google/cloud/operators/test_automl.py:
##########
@@ -21,11 +21,16 @@
 from unittest import mock
 
 import pytest
-from google.api_core.gapic_v1.method import DEFAULT
-from google.cloud.automl_v1beta1 import BatchPredictResult, Dataset, Model, 
PredictResponse
 
-from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
-from airflow.providers.google.cloud.operators.automl import (
+# For no Pydantic environment, we need to skip the tests
+pytest.importorskip("google.cloud.aiplatform_v1")
+
+from google.api_core.gapic_v1.method import DEFAULT  # noqa: E402
+from google.cloud.automl_v1beta1 import BatchPredictResult, Dataset, Model, 
PredictResponse  # noqa: E402
+
+from airflow.exceptions import AirflowProviderDeprecationWarning  # noqa: E402
+from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook  # 
noqa: E402
+from airflow.providers.google.cloud.operators.automl import (  # noqa: E402

Review Comment:
   Instead of having these `#noqa: E402`, could we maybe add a line in 
`pyproject.toml` (around L356)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to