MaksYermak commented on code in PR #21470: URL: https://github.com/apache/airflow/pull/21470#discussion_r897875039
########## airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py: ########## @@ -0,0 +1,1256 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" +This module contains a Google Cloud Vertex AI hook. + +.. spelling:: + + aiplatform + au + codepoints + milli + mae + quantile + quantiles + Quantiles + rmse + rmsle + rmspe + wape + prc + roc + Jetson + forecasted + Struct + sentimentMax + TrainingPipeline + targetColumn + optimizationObjective +""" + +from typing import Dict, List, Optional, Sequence, Tuple, Union + +from google.api_core.operation import Operation +from google.api_core.retry import Retry +from google.cloud.aiplatform import ( + AutoMLForecastingTrainingJob, + AutoMLImageTrainingJob, + AutoMLTabularTrainingJob, + AutoMLTextTrainingJob, + AutoMLVideoTrainingJob, + datasets, + models, +) +from google.cloud.aiplatform_v1 import JobServiceClient, PipelineServiceClient +from google.cloud.aiplatform_v1.services.pipeline_service.pagers import ListTrainingPipelinesPager +from google.cloud.aiplatform_v1.types import TrainingPipeline + +from airflow import AirflowException +from airflow.providers.google.common.hooks.base_google import GoogleBaseHook + + +class AutoMLHook(GoogleBaseHook): + """Hook for Google Cloud Vertex AI Auto ML APIs.""" + + def __init__( + self, + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + ) -> None: + super().__init__( + gcp_conn_id=gcp_conn_id, + delegate_to=delegate_to, + impersonation_chain=impersonation_chain, + ) + self._job: Optional[ + Union[ + AutoMLForecastingTrainingJob, + AutoMLImageTrainingJob, + AutoMLTabularTrainingJob, + AutoMLTextTrainingJob, + AutoMLVideoTrainingJob, + ] + ] = None + + def get_pipeline_service_client( + self, + region: Optional[str] = None, + ) -> PipelineServiceClient: + """Returns PipelineServiceClient.""" + client_options = None + if region and region != 'global': + client_options = {'api_endpoint': f'{region}-aiplatform.googleapis.com:443'} + + return PipelineServiceClient( + credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options + ) + + def get_job_service_client( + self, + region: Optional[str] = None, + ) -> JobServiceClient: + """Returns JobServiceClient""" + client_options = None + if region and region != 'global': + client_options = {'api_endpoint': f'{region}-aiplatform.googleapis.com:443'} + + return JobServiceClient( + credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options + ) + + def get_auto_ml_tabular_training_job( + self, + display_name: str, + optimization_prediction_type: str, + optimization_objective: Optional[str] = None, + column_specs: Optional[Dict[str, str]] = None, + column_transformations: Optional[List[Dict[str, Dict[str, str]]]] = None, + optimization_objective_recall_value: Optional[float] = None, + optimization_objective_precision_value: Optional[float] = None, + project: Optional[str] = None, + location: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + training_encryption_spec_key_name: Optional[str] = None, + model_encryption_spec_key_name: Optional[str] = None, + ) -> AutoMLTabularTrainingJob: + """Returns AutoMLTabularTrainingJob object""" + return AutoMLTabularTrainingJob( + display_name=display_name, + optimization_prediction_type=optimization_prediction_type, + optimization_objective=optimization_objective, + column_specs=column_specs, + column_transformations=column_transformations, + optimization_objective_recall_value=optimization_objective_recall_value, + optimization_objective_precision_value=optimization_objective_precision_value, + project=project, + location=location, + credentials=self._get_credentials(), + labels=labels, + training_encryption_spec_key_name=training_encryption_spec_key_name, + model_encryption_spec_key_name=model_encryption_spec_key_name, + ) + + def get_auto_ml_forecasting_training_job( + self, + display_name: str, + optimization_objective: Optional[str] = None, + column_specs: Optional[Dict[str, str]] = None, + column_transformations: Optional[List[Dict[str, Dict[str, str]]]] = None, + project: Optional[str] = None, + location: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + training_encryption_spec_key_name: Optional[str] = None, + model_encryption_spec_key_name: Optional[str] = None, + ) -> AutoMLForecastingTrainingJob: + """Returns AutoMLForecastingTrainingJob object""" + return AutoMLForecastingTrainingJob( + display_name=display_name, + optimization_objective=optimization_objective, + column_specs=column_specs, + column_transformations=column_transformations, + project=project, + location=location, + credentials=self._get_credentials(), + labels=labels, + training_encryption_spec_key_name=training_encryption_spec_key_name, + model_encryption_spec_key_name=model_encryption_spec_key_name, + ) + + def get_auto_ml_image_training_job( + self, + display_name: str, + prediction_type: str = "classification", + multi_label: bool = False, + model_type: str = "CLOUD", + base_model: Optional[models.Model] = None, + project: Optional[str] = None, + location: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + training_encryption_spec_key_name: Optional[str] = None, + model_encryption_spec_key_name: Optional[str] = None, + ) -> AutoMLImageTrainingJob: + """Returns AutoMLImageTrainingJob object""" + return AutoMLImageTrainingJob( + display_name=display_name, + prediction_type=prediction_type, + multi_label=multi_label, + model_type=model_type, + base_model=base_model, + project=project, + location=location, + credentials=self._get_credentials(), + labels=labels, + training_encryption_spec_key_name=training_encryption_spec_key_name, + model_encryption_spec_key_name=model_encryption_spec_key_name, + ) + + def get_auto_ml_text_training_job( + self, + display_name: str, + prediction_type: str, + multi_label: bool = False, + sentiment_max: int = 10, + project: Optional[str] = None, + location: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + training_encryption_spec_key_name: Optional[str] = None, + model_encryption_spec_key_name: Optional[str] = None, + ) -> AutoMLTextTrainingJob: + """Returns AutoMLTextTrainingJob object""" + return AutoMLTextTrainingJob( + display_name=display_name, + prediction_type=prediction_type, + multi_label=multi_label, + sentiment_max=sentiment_max, + project=project, + location=location, + credentials=self._get_credentials(), + labels=labels, + training_encryption_spec_key_name=training_encryption_spec_key_name, + model_encryption_spec_key_name=model_encryption_spec_key_name, + ) + + def get_auto_ml_video_training_job( + self, + display_name: str, + prediction_type: str = "classification", + model_type: str = "CLOUD", + project: Optional[str] = None, + location: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + training_encryption_spec_key_name: Optional[str] = None, + model_encryption_spec_key_name: Optional[str] = None, + ) -> AutoMLVideoTrainingJob: + """Returns AutoMLVideoTrainingJob object""" + return AutoMLVideoTrainingJob( + display_name=display_name, + prediction_type=prediction_type, + model_type=model_type, + project=project, + location=location, + credentials=self._get_credentials(), + labels=labels, + training_encryption_spec_key_name=training_encryption_spec_key_name, + model_encryption_spec_key_name=model_encryption_spec_key_name, + ) + + @staticmethod + def extract_model_id(obj: Dict) -> str: + """Returns unique id of the Model.""" + return obj["name"].rpartition("/")[-1] + + def wait_for_operation(self, operation: Operation, timeout: Optional[float] = None): + """Waits for long-lasting operation to complete.""" + try: + return operation.result(timeout=timeout) + except Exception: + error = operation.exception(timeout=timeout) + raise AirflowException(error) + + def cancel_auto_ml_job(self) -> None: + """Cancel Auto ML Job for training pipeline""" + if self._job: + self._job.cancel() + + @GoogleBaseHook.fallback_to_default_project_id + def create_auto_ml_tabular_training_job( + self, + project_id: str, + region: str, + display_name: str, + dataset: datasets.TabularDataset, + target_column: str, + optimization_prediction_type: str, + optimization_objective: Optional[str] = None, + column_specs: Optional[Dict[str, str]] = None, + column_transformations: Optional[List[Dict[str, Dict[str, str]]]] = None, + optimization_objective_recall_value: Optional[float] = None, + optimization_objective_precision_value: Optional[float] = None, + labels: Optional[Dict[str, str]] = None, + training_encryption_spec_key_name: Optional[str] = None, + model_encryption_spec_key_name: Optional[str] = None, + training_fraction_split: Optional[float] = None, + validation_fraction_split: Optional[float] = None, + test_fraction_split: Optional[float] = None, + predefined_split_column_name: Optional[str] = None, + timestamp_split_column_name: Optional[str] = None, + weight_column: Optional[str] = None, + budget_milli_node_hours: int = 1000, + model_display_name: Optional[str] = None, + model_labels: Optional[Dict[str, str]] = None, + disable_early_stopping: bool = False, + export_evaluated_data_items: bool = False, + export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None, + export_evaluated_data_items_override_destination: bool = False, + sync: bool = True, + ) -> models.Model: + """ + Create an AutoML Tabular Training Job. + + :param project_id: Required. Project to run training in. + :param region: Required. Location to run training in. + :param display_name: Required. The user-defined name of this TrainingPipeline. + :param dataset: Required. The dataset within the same Project from which data will be used to train + the Model. The Dataset must use schema compatible with Model being trained, and what is + compatible should be described in the used TrainingPipeline's [training_task_definition] + [google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition]. For tabular + Datasets, all their data is exported to training, to pick and choose from. + :param target_column: Required. The name of the column values of which the Model is to predict. + :param optimization_prediction_type: The type of prediction the Model is to produce. + "classification" - Predict one out of multiple target values is picked for each row. + "regression" - Predict a value based on its relation to other values. This type is available only + to columns that contain semantically numeric values, i.e. integers or floating point number, even + if stored as e.g. strings. + :param optimization_objective: Optional. Objective function the Model is to be optimized towards. + The training task creates a Model that maximizes/minimizes the value of the objective function + over the validation set. + + The supported optimization objectives depend on the prediction type, and in the case of + classification also the number of distinct values in the target column (two distinct values + -> binary, 3 or more distinct values -> multi class). If the field is not set, the default + objective function is used. + + Classification (binary): + "maximize-au-roc" (default) - Maximize the area under the receiver operating characteristic (ROC) + curve. + "minimize-log-loss" - Minimize log loss. + "maximize-au-prc" - Maximize the area under the precision-recall curve. + "maximize-precision-at-recall" - Maximize precision for a specified recall value. + "maximize-recall-at-precision" - Maximize recall for a specified precision value. + + Classification (multi class): + "minimize-log-loss" (default) - Minimize log loss. + + Regression: + "minimize-rmse" (default) - Minimize root-mean-squared error (RMSE). + "minimize-mae" - Minimize mean-absolute error (MAE). + "minimize-rmsle" - Minimize root-mean-squared log error (RMSLE). + :param column_specs: Optional. Alternative to column_transformations where the keys of the dict are + column names and their respective values are one of AutoMLTabularTrainingJob.column_data_types. + When creating transformation for BigQuery Struct column, the column should be flattened using "." + as the delimiter. Only columns with no child should have a transformation. If an input column has + no transformations on it, such a column is ignored by the training, except for the targetColumn, + which should have no transformations defined on. Only one of column_transformations or + column_specs should be passed. + :param column_transformations: Optional. Transformations to apply to the input columns (i.e. columns + other than the targetColumn). Each transformation may produce multiple result values from the + column's value, and all are used for training. When creating transformation for BigQuery Struct + column, the column should be flattened using "." as the delimiter. Only columns with no child + should have a transformation. If an input column has no transformations on it, such a column is + ignored by the training, except for the targetColumn, which should have no transformations + defined on. Only one of column_transformations or column_specs should be passed. Consider using + column_specs as column_transformations will be deprecated eventually. Review Comment: @eladkal I have added deprecated warning for this parameter in this PR https://github.com/apache/airflow/pull/24467 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
