This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 047d139549 Update VertexAI system tests (#41052)
047d139549 is described below
commit 047d139549daa3f3bd71e4ce37df2ea6998150e7
Author: Maksim <[email protected]>
AuthorDate: Fri Jul 26 14:19:48 2024 -0700
Update VertexAI system tests (#41052)
---
...ample_vertex_ai_auto_ml_forecasting_training.py | 43 ++++++---------------
.../example_vertex_ai_auto_ml_image_training.py | 44 +++++----------------
.../example_vertex_ai_auto_ml_list_training.py | 7 ++++
.../example_vertex_ai_auto_ml_tabular_training.py | 44 +++++----------------
.../example_vertex_ai_auto_ml_text_training.py | 45 +++++-----------------
.../example_vertex_ai_auto_ml_video_training.py | 44 +++++----------------
.../example_vertex_ai_batch_prediction_job.py | 24 +++++-------
.../example_vertex_ai_custom_container.py | 7 ++++
.../vertex_ai/example_vertex_ai_custom_job.py | 15 +++++++-
.../example_vertex_ai_custom_job_python_package.py | 7 ++++
.../cloud/vertex_ai/example_vertex_ai_dataset.py | 30 +++++++--------
.../cloud/vertex_ai/example_vertex_ai_endpoint.py | 44 +++++----------------
.../example_vertex_ai_generative_model.py | 2 +-
.../example_vertex_ai_hyperparameter_tuning_job.py | 9 ++++-
.../example_vertex_ai_list_custom_jobs.py | 7 ++++
.../vertex_ai/example_vertex_ai_model_service.py | 16 +++++++-
.../vertex_ai/example_vertex_ai_pipeline_job.py | 9 ++++-
17 files changed, 159 insertions(+), 238 deletions(-)
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
index eb2fecedb4..03634b58f6 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py
@@ -31,11 +31,6 @@ from google.protobuf.json_format import ParseDict
from google.protobuf.struct_pb2 import Value
from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.gcs import (
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
-)
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLForecastingTrainingJobOperator,
DeleteAutoMLTrainingJobOperator,
@@ -48,13 +43,12 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_auto_ml_operations"
+DAG_ID = "vertex_ai_auto_ml_operations"
REGION = "us-central1"
FORECASTING_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}"
MODEL_DISPLAY_NAME = f"auto-ml-forecasting-model-{ENV_ID}"
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-FORECAST_GCS_BUCKET_NAME = f"bucket_forecast_{DAG_ID}_{ENV_ID}".replace("_",
"-")
FORECAST_DATASET = {
"display_name": f"forecast-dataset-{ENV_ID}",
@@ -62,7 +56,9 @@ FORECAST_DATASET = {
"metadata": ParseDict(
{
"input_config": {
- "gcs_source": {"uri":
[f"gs://{FORECAST_GCS_BUCKET_NAME}/vertex-ai/forecast-dataset.csv"]}
+ "gcs_source": {
+ "uri":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/forecast-dataset.csv"]
+ }
}
},
Value(),
@@ -89,22 +85,6 @@ with DAG(
catchup=False,
tags=["example", "vertex_ai", "auto_ml"],
) as dag:
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=FORECAST_GCS_BUCKET_NAME,
- storage_class="REGIONAL",
- location=REGION,
- )
-
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_dataset_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="vertex-ai/datasets",
- destination_bucket=FORECAST_GCS_BUCKET_NAME,
- destination_object="vertex-ai",
- recursive=True,
- )
-
create_forecast_dataset = CreateDatasetOperator(
task_id="forecast_dataset",
dataset=FORECAST_DATASET,
@@ -157,23 +137,24 @@ with DAG(
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket", bucket_name=FORECAST_GCS_BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
- )
(
# TEST SETUP
- create_bucket
- >> move_dataset_file
- >> create_forecast_dataset
+ create_forecast_dataset
# TEST BODY
>> create_auto_ml_forecasting_training_job
# TEST TEARDOWN
>> delete_auto_ml_forecasting_training_job
>> delete_forecast_dataset
- >> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
index 4de1f43dd8..c26ea94325 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py
@@ -30,11 +30,6 @@ from google.cloud.aiplatform import schema
from google.protobuf.struct_pb2 import Value
from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.gcs import (
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
-)
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLImageTrainingJobOperator,
DeleteAutoMLTrainingJobOperator,
@@ -48,13 +43,12 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_auto_ml_operations"
+DAG_ID = "vertex_ai_auto_ml_operations"
REGION = "us-central1"
IMAGE_DISPLAY_NAME = f"auto-ml-image-{ENV_ID}"
MODEL_DISPLAY_NAME = f"auto-ml-image-model-{ENV_ID}"
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-IMAGE_GCS_BUCKET_NAME = f"bucket_image_{DAG_ID}_{ENV_ID}".replace("_", "-")
IMAGE_DATASET = {
"display_name": f"image-dataset-{ENV_ID}",
@@ -64,7 +58,7 @@ IMAGE_DATASET = {
IMAGE_DATA_CONFIG = [
{
"import_schema_uri":
schema.dataset.ioformat.image.single_label_classification,
- "gcs_source": {"uris":
[f"gs://{IMAGE_GCS_BUCKET_NAME}/vertex-ai/image-dataset.csv"]},
+ "gcs_source": {"uris":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/flowers-dataset.csv"]},
},
]
@@ -76,22 +70,6 @@ with DAG(
catchup=False,
tags=["example", "vertex_ai", "auto_ml"],
) as dag:
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=IMAGE_GCS_BUCKET_NAME,
- storage_class="REGIONAL",
- location=REGION,
- )
-
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_dataset_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="vertex-ai/datasets",
- destination_bucket=IMAGE_GCS_BUCKET_NAME,
- destination_object="vertex-ai",
- recursive=True,
- )
-
create_image_dataset = CreateDatasetOperator(
task_id="image_dataset",
dataset=IMAGE_DATASET,
@@ -143,27 +121,25 @@ with DAG(
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket",
- bucket_name=IMAGE_GCS_BUCKET_NAME,
- trigger_rule=TriggerRule.ALL_DONE,
- )
(
# TEST SETUP
- [
- create_bucket >> move_dataset_file,
- create_image_dataset,
- ]
+ create_image_dataset
>> import_image_dataset
# TEST BODY
>> create_auto_ml_image_training_job
# TEST TEARDOWN
>> delete_auto_ml_image_training_job
>> delete_image_dataset
- >> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
index fcf67c5210..618182b94e 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py
@@ -50,6 +50,13 @@ with DAG(
)
# [END how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
index 8c828bd4f2..91260eccde 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py
@@ -31,11 +31,6 @@ from google.protobuf.json_format import ParseDict
from google.protobuf.struct_pb2 import Value
from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.gcs import (
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
-)
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLTabularTrainingJobOperator,
DeleteAutoMLTrainingJobOperator,
@@ -48,13 +43,12 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_auto_ml_operations"
+DAG_ID = "vertex_ai_auto_ml_operations"
REGION = "us-central1"
TABULAR_DISPLAY_NAME = f"auto-ml-tabular-{ENV_ID}"
MODEL_DISPLAY_NAME = "adopted-prediction-model"
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-TABULAR_GCS_BUCKET_NAME = f"bucket_tabular_{DAG_ID}_{ENV_ID}".replace("_", "-")
TABULAR_DATASET = {
"display_name": f"tabular-dataset-{ENV_ID}",
@@ -62,7 +56,7 @@ TABULAR_DATASET = {
"metadata": ParseDict(
{
"input_config": {
- "gcs_source": {"uri":
[f"gs://{TABULAR_GCS_BUCKET_NAME}/vertex-ai/tabular-dataset.csv"]}
+ "gcs_source": {"uri":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/tabular-dataset.csv"]}
}
},
Value(),
@@ -91,22 +85,6 @@ with DAG(
catchup=False,
tags=["example", "vertex_ai", "auto_ml"],
) as dag:
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=TABULAR_GCS_BUCKET_NAME,
- storage_class="REGIONAL",
- location=REGION,
- )
-
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_dataset_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="vertex-ai/datasets",
- destination_bucket=TABULAR_GCS_BUCKET_NAME,
- destination_object="vertex-ai",
- recursive=True,
- )
-
create_tabular_dataset = CreateDatasetOperator(
task_id="tabular_dataset",
dataset=TABULAR_DATASET,
@@ -150,25 +128,23 @@ with DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket",
- bucket_name=TABULAR_GCS_BUCKET_NAME,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
(
# TEST SETUP
- create_bucket
- >> move_dataset_file
- >> create_tabular_dataset
+ create_tabular_dataset
# TEST BODY
>> create_auto_ml_tabular_training_job
# TEST TEARDOWN
>> delete_auto_ml_tabular_training_job
>> delete_tabular_dataset
- >> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
index 9a7e3e95cc..b91a8cd969 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_text_training.py
@@ -30,11 +30,6 @@ from google.cloud.aiplatform import schema
from google.protobuf.struct_pb2 import Value
from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.gcs import (
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
-)
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLTextTrainingJobOperator,
DeleteAutoMLTrainingJobOperator,
@@ -48,13 +43,12 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_auto_ml_operations"
+DAG_ID = "vertex_ai_auto_ml_operations"
REGION = "us-central1"
TEXT_DISPLAY_NAME = f"auto-ml-text-{ENV_ID}"
MODEL_DISPLAY_NAME = f"auto-ml-text-model-{ENV_ID}"
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-TEXT_GCS_BUCKET_NAME = f"bucket_text_{DAG_ID}_{ENV_ID}".replace("_", "-")
TEXT_DATASET = {
"display_name": f"text-dataset-{ENV_ID}",
@@ -64,7 +58,7 @@ TEXT_DATASET = {
TEXT_DATA_CONFIG = [
{
"import_schema_uri":
schema.dataset.ioformat.text.single_label_classification,
- "gcs_source": {"uris":
[f"gs://{TEXT_GCS_BUCKET_NAME}/vertex-ai/text-dataset.csv"]},
+ "gcs_source": {"uris":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/text-dataset.csv"]},
},
]
@@ -75,22 +69,6 @@ with DAG(
catchup=False,
tags=["example", "vertex_ai", "auto_ml"],
) as dag:
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=TEXT_GCS_BUCKET_NAME,
- storage_class="REGIONAL",
- location=REGION,
- )
-
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_dataset_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="vertex-ai/datasets",
- destination_bucket=TEXT_GCS_BUCKET_NAME,
- destination_object="vertex-ai",
- recursive=True,
- )
-
create_text_dataset = CreateDatasetOperator(
task_id="text_dataset",
dataset=TEXT_DATASET,
@@ -140,27 +118,24 @@ with DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket",
- bucket_name=TEXT_GCS_BUCKET_NAME,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
(
# TEST SETUP
- [
- create_bucket >> move_dataset_file,
- create_text_dataset,
- ]
+ create_text_dataset
>> import_text_dataset
# TEST BODY
>> create_auto_ml_text_training_job
# TEST TEARDOWN
>> delete_auto_ml_text_training_job
>> delete_text_dataset
- >> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
index 1cbf9a250b..cde6bb183e 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py
@@ -30,11 +30,6 @@ from google.cloud.aiplatform import schema
from google.protobuf.struct_pb2 import Value
from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.gcs import (
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
-)
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLVideoTrainingJobOperator,
DeleteAutoMLTrainingJobOperator,
@@ -48,7 +43,7 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_auto_ml_operations"
+DAG_ID = "vertex_ai_auto_ml_operations"
REGION = "us-central1"
VIDEO_DISPLAY_NAME = f"auto-ml-video-{ENV_ID}"
MODEL_DISPLAY_NAME = f"auto-ml-video-model-{ENV_ID}"
@@ -64,7 +59,7 @@ VIDEO_DATASET = {
VIDEO_DATA_CONFIG = [
{
"import_schema_uri": schema.dataset.ioformat.video.classification,
- "gcs_source": {"uris":
[f"gs://{VIDEO_GCS_BUCKET_NAME}/vertex-ai/video-dataset.csv"]},
+ "gcs_source": {"uris":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/video-dataset.csv"]},
},
]
@@ -75,22 +70,6 @@ with DAG(
catchup=False,
tags=["example", "vertex_ai", "auto_ml"],
) as dag:
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=VIDEO_GCS_BUCKET_NAME,
- storage_class="REGIONAL",
- location=REGION,
- )
-
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_dataset_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="vertex-ai/datasets",
- destination_bucket=VIDEO_GCS_BUCKET_NAME,
- destination_object="vertex-ai",
- recursive=True,
- )
-
create_video_dataset = CreateDatasetOperator(
task_id="video_dataset",
dataset=VIDEO_DATASET,
@@ -152,18 +131,9 @@ with DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket",
- bucket_name=VIDEO_GCS_BUCKET_NAME,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
(
# TEST SETUP
- [
- create_bucket >> move_dataset_file,
- create_video_dataset,
- ]
+ create_video_dataset
>> import_video_dataset
# TEST BODY
>> create_auto_ml_video_training_job
@@ -171,9 +141,15 @@ with DAG(
# TEST TEARDOWN
>> delete_auto_ml_video_training_job
>> delete_video_dataset
- >> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
index 1b8c2e182c..3f2dfc60ec 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py
@@ -34,7 +34,6 @@ from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLForecastingTrainingJobOperator,
@@ -53,7 +52,7 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_batch_prediction_operations"
+DAG_ID = "vertex_ai_batch_prediction_operations"
REGION = "us-central1"
FORECAST_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}"
@@ -62,7 +61,7 @@ MODEL_DISPLAY_NAME = f"auto-ml-forecasting-model-{ENV_ID}"
JOB_DISPLAY_NAME = f"batch_prediction_job_test_{ENV_ID}"
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
-DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/forecast-dataset.csv"
+DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/datasets/forecast-dataset.csv"
FORECAST_DATASET = {
"display_name": f"forecast-dataset-{ENV_ID}",
@@ -70,7 +69,7 @@ FORECAST_DATASET = {
"metadata": ParseDict(
{
"input_config": {
- "gcs_source": {"uri":
[f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}
+ "gcs_source": {"uri":
[f"gs://{RESOURCE_DATA_BUCKET}/{DATA_SAMPLE_GCS_OBJECT_NAME}"]}
}
},
Value(),
@@ -108,15 +107,6 @@ with DAG(
location=REGION,
)
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_dataset_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="vertex-ai/datasets",
- destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
- destination_object="vertex-ai",
- recursive=True,
- )
-
create_forecast_dataset = CreateDatasetOperator(
task_id="forecast_dataset",
dataset=FORECAST_DATASET,
@@ -227,7 +217,6 @@ with DAG(
(
# TEST SETUP
create_bucket
- >> move_dataset_file
>> create_forecast_dataset
>> create_auto_ml_forecasting_training_job
# TEST BODY
@@ -240,6 +229,13 @@ with DAG(
>> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
index ff642ae953..f039877f71 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
@@ -207,6 +207,13 @@ with DAG(
>> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
index c90c1aac23..9b67a3480b 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
@@ -72,7 +72,12 @@ CONTAINER_URI =
"gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
MODEL_SERVING_CONTAINER_URI =
"gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
REPLICA_COUNT = 1
-LOCAL_TRAINING_SCRIPT_PATH = "california_housing_training_script.py"
+# VERTEX_AI_LOCAL_TRAINING_SCRIPT_PATH should be set for Airflow which is
running on distributed system.
+# For example in Composer the correct path is
`gcs/data/california_housing_training_script.py`.
+# Because `gcs/data/` is shared folder for Airflow's workers.
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get(
+ "VERTEX_AI_LOCAL_TRAINING_SCRIPT_PATH",
"california_housing_training_script.py"
+)
with DAG(
@@ -244,6 +249,14 @@ with DAG(
)
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
from tests.system.utils import get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
index c9fd96aee9..33105d273f 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
@@ -209,6 +209,13 @@ with DAG(
>> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
index 8ad202f292..77b69081a4 100644
--- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
@@ -34,7 +34,6 @@ from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
CreateDatasetOperator,
@@ -49,7 +48,7 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_dataset_operations"
+DAG_ID = "vertex_ai_dataset_operations"
REGION = "us-central1"
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
@@ -61,7 +60,9 @@ TIME_SERIES_DATASET = {
"metadata": ParseDict(
{
"input_config": {
- "gcs_source": {"uri":
[f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/forecast-dataset.csv"]}
+ "gcs_source": {
+ "uri":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/forecast-dataset.csv"]
+ }
}
},
Value(),
@@ -78,7 +79,7 @@ TABULAR_DATASET = {
"metadata": ParseDict(
{
"input_config": {
- "gcs_source": {"uri":
[f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/tabular-dataset.csv"]}
+ "gcs_source": {"uri":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/tabular-dataset.csv"]}
}
},
Value(),
@@ -100,8 +101,8 @@ TEST_IMPORT_CONFIG = [
"data_item_labels": {
"test-labels-name": "test-labels-value",
},
- "import_schema_uri":
"image_classification_single_label_io_format_1.0.0.yaml",
- "gcs_source": {"uris":
[f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset-flowers.csv"]},
+ "import_schema_uri":
schema.dataset.ioformat.image.single_label_classification,
+ "gcs_source": {"uris":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/image-dataset-flowers.csv"]},
},
]
DATASET_TO_UPDATE = {"display_name": "test-name"}
@@ -122,15 +123,6 @@ with DAG(
location=REGION,
)
- move_datasets_files = GCSSynchronizeBucketsOperator(
- task_id="move_datasets_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="vertex-ai/datasets",
- destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
- destination_object="vertex-ai",
- recursive=True,
- )
-
# [START how_to_cloud_vertex_ai_create_dataset_operator]
create_image_dataset_job = CreateDatasetOperator(
task_id="image_dataset",
@@ -262,7 +254,6 @@ with DAG(
(
# TEST SETUP
create_bucket
- >> move_datasets_files
# TEST BODY
>> [
create_time_series_dataset_job >> delete_time_series_dataset_job,
@@ -276,6 +267,13 @@ with DAG(
>> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
index 297470be68..8fa802b517 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_endpoint.py
@@ -30,11 +30,6 @@ from google.cloud.aiplatform import schema
from google.protobuf.struct_pb2 import Value
from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.gcs import (
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
-)
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLImageTrainingJobOperator,
DeleteAutoMLTrainingJobOperator,
@@ -55,13 +50,12 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_endpoint_service_operations"
+DAG_ID = "vertex_ai_endpoint_service_operations"
REGION = "us-central1"
IMAGE_DISPLAY_NAME = f"auto-ml-image-{ENV_ID}"
MODEL_DISPLAY_NAME = f"auto-ml-image-model-{ENV_ID}"
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
IMAGE_DATASET = {
"display_name": f"image-dataset-{ENV_ID}",
@@ -71,7 +65,7 @@ IMAGE_DATASET = {
IMAGE_DATA_CONFIG = [
{
"import_schema_uri":
schema.dataset.ioformat.image.single_label_classification,
- "gcs_source": {"uris":
[f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset.csv"]},
+ "gcs_source": {"uris":
[f"gs://{RESOURCE_DATA_BUCKET}/vertex-ai/datasets/flowers-dataset.csv"]},
},
]
@@ -88,22 +82,6 @@ with DAG(
render_template_as_native_obj=True,
tags=["example", "vertex_ai", "endpoint_service"],
) as dag:
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
- storage_class="REGIONAL",
- location=REGION,
- )
-
- move_dataset_file = GCSSynchronizeBucketsOperator(
- task_id="move_dataset_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="vertex-ai/datasets",
- destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
- destination_object="vertex-ai",
- recursive=True,
- )
-
create_image_dataset = CreateDatasetOperator(
task_id="image_dataset",
dataset=IMAGE_DATASET,
@@ -209,18 +187,10 @@ with DAG(
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket",
- bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
- trigger_rule=TriggerRule.ALL_DONE,
- )
(
# TEST SETUP
- [
- create_bucket >> move_dataset_file,
- create_image_dataset,
- ]
+ create_image_dataset
>> import_image_dataset
>> create_auto_ml_image_training_job
# TEST BODY
@@ -232,9 +202,15 @@ with DAG(
# TEST TEARDOWN
>> delete_auto_ml_image_training_job
>> delete_image_dataset
- >> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
index 524330f4dc..fdacfef2be 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_generative_model.py
@@ -35,7 +35,7 @@ from
airflow.providers.google.cloud.operators.vertex_ai.generative_model import
)
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_generative_model_dag"
+DAG_ID = "vertex_ai_generative_model_dag"
REGION = "us-central1"
PROMPT = "In 10 words or less, why is Apache Airflow amazing?"
CONTENTS = [PROMPT]
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
index d0cb145843..913fff2b4e 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py
@@ -40,7 +40,7 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_hyperparameter_tuning_job_operations"
+DAG_ID = "vertex_ai_hyperparameter_tuning_job_operations"
REGION = "us-central1"
DISPLAY_NAME = f"hyperparameter-tuning-job-{ENV_ID}"
@@ -179,6 +179,13 @@ with DAG(
>> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
index f3f761586c..b4f8522a57 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py
@@ -49,6 +49,13 @@ with DAG(
)
# [END how_to_cloud_vertex_ai_list_custom_training_job_operator]
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
index 003f7fe70d..4560d9f54f 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_model_service.py
@@ -61,7 +61,7 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_model_service_operations"
+DAG_ID = "vertex_ai_model_service_operations"
REGION = "us-central1"
TRAIN_DISPLAY_NAME = f"train-housing-custom-{ENV_ID}"
MODEL_DISPLAY_NAME = f"custom-housing-model-{ENV_ID}"
@@ -87,7 +87,12 @@ TABULAR_DATASET = {
CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
-LOCAL_TRAINING_SCRIPT_PATH = "california_housing_training_script.py"
+# VERTEX_AI_LOCAL_TRAINING_SCRIPT_PATH should be set for Airflow which is
running on distributed system.
+# For example in Composer the correct path is
`gcs/data/california_housing_training_script.py`.
+# Because `gcs/data/` is shared folder for Airflow's workers.
+LOCAL_TRAINING_SCRIPT_PATH = os.environ.get(
+ "VERTEX_AI_LOCAL_TRAINING_SCRIPT_PATH",
"california_housing_training_script.py"
+)
MODEL_OUTPUT_CONFIG = {
"artifact_destination": {
@@ -323,6 +328,13 @@ with DAG(
>> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py
index baf5b498b7..130effbb86 100644
---
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py
+++
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py
@@ -44,7 +44,7 @@ from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_vertex_ai_pipeline_job_operations"
+DAG_ID = "vertex_ai_pipeline_job_operations"
REGION = "us-central1"
DISPLAY_NAME = f"pipeline-job-{ENV_ID}"
@@ -159,6 +159,13 @@ with DAG(
>> delete_bucket
)
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402