e-galan commented on code in PR #40685:
URL: https://github.com/apache/airflow/pull/40685#discussion_r1675507903
##########
airflow/providers/google/cloud/operators/vertex_ai/custom_job.py:
##########
@@ -180,6 +180,40 @@ def execute(self, context: Context) -> None:
stacklevel=2,
)
+ def execute_complete(self, context: Context, event: dict[str, Any]) ->
dict[str, Any] | None:
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+ training_pipeline = event["job"]
+ custom_job_id =
self.hook.extract_custom_job_id_from_training_pipeline(training_pipeline)
+ self.xcom_push(context, key="custom_job_id", value=custom_job_id)
+ try:
+ model = training_pipeline["model_to_upload"]
+ model_id = self.hook.extract_model_id(model)
+ self.xcom_push(context, key="model_id", value=model_id)
+ VertexAIModelLink.persist(context=context, task_instance=self,
model_id=model_id)
+ return model
+ except KeyError:
+ self.log.warning(
+ "It is impossible to get the Model. "
+ "The Training Pipeline did not produce a Managed Model because
it was not "
+ "configured to upload a Model. Please ensure that the
'model_serving_container_image_uri' "
+ "and 'model_display_name' parameters are passed in when
creating a Training Pipeline, "
+ "and check that your training script saves the model to
os.environ['AIP_MODEL_DIR']."
+ )
+ return None
Review Comment:
In this PR I am mostly trying to reproduce the sync mode behavior.
The sync mode allows results that do not contain a managed model data. If a
training pipeline produces such results, the operators log a warning message
and then return `None`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]