shahar1 commented on code in PR #40685:
URL: https://github.com/apache/airflow/pull/40685#discussion_r1675736000
##########
airflow/providers/google/cloud/operators/vertex_ai/custom_job.py:
##########
@@ -180,6 +180,40 @@ def execute(self, context: Context) -> None:
stacklevel=2,
)
+ def execute_complete(self, context: Context, event: dict[str, Any]) ->
dict[str, Any] | None:
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+ training_pipeline = event["job"]
+ custom_job_id =
self.hook.extract_custom_job_id_from_training_pipeline(training_pipeline)
+ self.xcom_push(context, key="custom_job_id", value=custom_job_id)
+ try:
+ model = training_pipeline["model_to_upload"]
+ model_id = self.hook.extract_model_id(model)
+ self.xcom_push(context, key="model_id", value=model_id)
+ VertexAIModelLink.persist(context=context, task_instance=self,
model_id=model_id)
+ return model
+ except KeyError:
+ self.log.warning(
+ "It is impossible to get the Model. "
+ "The Training Pipeline did not produce a Managed Model because
it was not "
+ "configured to upload a Model. Please ensure that the
'model_serving_container_image_uri' "
+ "and 'model_display_name' parameters are passed in when
creating a Training Pipeline, "
+ "and check that your training script saves the model to
os.environ['AIP_MODEL_DIR']."
+ )
+ return None
Review Comment:
Maybe an additional comment about it would be nice to have :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]