This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c9bd753a33 Fix system tests for dataflow, vision and vertex_ai (#38326)
c9bd753a33 is described below
commit c9bd753a33e2adc73831beffdd27f7e5ee014791
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Mar 20 16:43:08 2024 +0100
Fix system tests for dataflow, vision and vertex_ai (#38326)
---
.../dataflow/example_dataflow_native_python.py | 21 +--
.../example_dataflow_native_python_async.py | 17 +--
.../dataflow/example_dataflow_streaming_python.py | 4 +-
.../dataflow/resources/wordcount_debugging.txt | 168 ---------------------
.../cloud/vertex_ai/example_vertex_ai_dataset.py | 6 +-
.../cloud/vision/example_vision_annotate_image.py | 2 +-
6 files changed, 13 insertions(+), 205 deletions(-)
diff --git
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py
index dc8449cb31..346ba76db0 100644
---
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py
+++
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py
@@ -24,27 +24,24 @@ from __future__ import annotations
import os
from datetime import datetime
-from pathlib import Path
from airflow.models.dag import DAG
from airflow.providers.apache.beam.hooks.beam import BeamRunnerType
from airflow.providers.apache.beam.operators.beam import
BeamRunPythonPipelineOperator
from airflow.providers.google.cloud.operators.dataflow import
DataflowStopJobOperator
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
-from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataflow_native_python"
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-PYTHON_FILE_NAME = "wordcount_debugging.py"
GCS_TMP = f"gs://{BUCKET_NAME}/temp/"
GCS_STAGING = f"gs://{BUCKET_NAME}/staging/"
GCS_OUTPUT = f"gs://{BUCKET_NAME}/output"
-GCS_PYTHON_SCRIPT = f"gs://{BUCKET_NAME}/{PYTHON_FILE_NAME}"
-PYTHON_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" /
PYTHON_FILE_NAME)
+GCS_PYTHON_SCRIPT =
f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/wordcount_debugging.py"
LOCATION = "europe-west3"
default_args = {
@@ -64,13 +61,6 @@ with DAG(
) as dag:
create_bucket = GCSCreateBucketOperator(task_id="create_bucket",
bucket_name=BUCKET_NAME)
- upload_file = LocalFilesystemToGCSOperator(
- task_id="upload_file_to_bucket",
- src=PYTHON_FILE_LOCAL_PATH,
- dst=PYTHON_FILE_NAME,
- bucket=BUCKET_NAME,
- )
-
# [START howto_operator_start_python_job]
start_python_job = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
@@ -80,10 +70,10 @@ with DAG(
pipeline_options={
"output": GCS_OUTPUT,
},
- py_requirements=["apache-beam[gcp]==2.46.0"],
+ py_requirements=["apache-beam[gcp]==2.47.0"],
py_interpreter="python3",
py_system_site_packages=False,
- dataflow_config={"location": LOCATION},
+ dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)
# [END howto_operator_start_python_job]
@@ -94,7 +84,7 @@ with DAG(
pipeline_options={
"output": GCS_OUTPUT,
},
- py_requirements=["apache-beam[gcp]==2.46.0"],
+ py_requirements=["apache-beam[gcp]==2.47.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
@@ -114,7 +104,6 @@ with DAG(
(
# TEST SETUP
create_bucket
- >> upload_file
# TEST BODY
>> start_python_job
>> start_python_job_local
diff --git
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py
index 01a7bce74c..c7c2e62e76 100644
---
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py
+++
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py
@@ -24,7 +24,6 @@ from __future__ import annotations
import os
from datetime import datetime
-from pathlib import Path
from typing import Callable
from airflow.exceptions import AirflowException
@@ -39,20 +38,18 @@ from airflow.providers.google.cloud.sensors.dataflow import
(
DataflowJobMetricsSensor,
DataflowJobStatusSensor,
)
-from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataflow_native_python_async"
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-PYTHON_FILE_NAME = "wordcount_debugging.txt"
GCS_TMP = f"gs://{BUCKET_NAME}/temp/"
GCS_STAGING = f"gs://{BUCKET_NAME}/staging/"
GCS_OUTPUT = f"gs://{BUCKET_NAME}/output"
-GCS_PYTHON_SCRIPT = f"gs://{BUCKET_NAME}/{PYTHON_FILE_NAME}"
-PYTHON_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" /
PYTHON_FILE_NAME)
+GCS_PYTHON_SCRIPT =
f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/wordcount_debugging.py"
LOCATION = "europe-west3"
default_args = {
@@ -72,13 +69,6 @@ with DAG(
) as dag:
create_bucket = GCSCreateBucketOperator(task_id="create_bucket",
bucket_name=BUCKET_NAME)
- upload_file = LocalFilesystemToGCSOperator(
- task_id="upload_file_to_bucket",
- src=PYTHON_FILE_LOCAL_PATH,
- dst=PYTHON_FILE_NAME,
- bucket=BUCKET_NAME,
- )
-
# [START howto_operator_start_python_job_async]
start_python_job_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_async",
@@ -88,7 +78,7 @@ with DAG(
pipeline_options={
"output": GCS_OUTPUT,
},
- py_requirements=["apache-beam[gcp]==2.46.0"],
+ py_requirements=["apache-beam[gcp]==2.47.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={
@@ -174,7 +164,6 @@ with DAG(
(
# TEST SETUP
create_bucket
- >> upload_file
# TEST BODY
>> start_python_job_async
>> [
diff --git
a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
index 63e5c780e3..a858ad90aa 100644
---
a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
+++
b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
@@ -82,10 +82,10 @@ with DAG(
"output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
"streaming": True,
},
- py_requirements=["apache-beam[gcp]==2.46.0"],
+ py_requirements=["apache-beam[gcp]==2.47.0"],
py_interpreter="python3",
py_system_site_packages=False,
- dataflow_config={"location": LOCATION},
+ dataflow_config={"location": LOCATION, "job_name":
"start_python_job_streaming"},
)
# [END howto_operator_start_streaming_python_job]
diff --git
a/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt
b/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt
deleted file mode 100644
index efe7d205c4..0000000000
---
a/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt
+++ /dev/null
@@ -1,168 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-An example that verifies the counts and includes best practices.
-
-On top of the basic concepts in the wordcount example, this workflow introduces
-logging to Cloud Logging, and using assertions in a Dataflow pipeline.
-To execute this pipeline locally, specify a local output file or output prefix
-on GCS::
-
- --output [YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
-
-To execute this pipeline using the Google Cloud Dataflow service, specify
-pipeline configuration::
-
- --project YOUR_PROJECT_ID
- --staging_location gs://YOUR_STAGING_DIRECTORY
- --temp_location gs://YOUR_TEMP_DIRECTORY
- --region GCE_REGION
- --job_name YOUR_JOB_NAME
- --runner DataflowRunner
-
-and an output prefix on GCS.
-"""
-
-# pytype: skip-file
-
-from __future__ import annotations
-
-import argparse
-import logging
-import re
-
-import apache_beam as beam
-from apache_beam.io import ReadFromText, WriteToText
-from apache_beam.metrics import Metrics
-from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
-from apache_beam.testing.util import assert_that, equal_to
-
-
-class FilterTextFn(beam.DoFn):
- """A DoFn that filters for a specific key based on a regular expression."""
-
- def __init__(self, pattern):
- # TODO(BEAM-6158): Revert the workaround once we can pickle super() on
py3.
- # super(FilterTextFn, self).__init__()
- beam.DoFn.__init__(self)
- self.pattern = pattern
- # A custom metric can track values in your pipeline as it runs. Those
- # values will be available in the monitoring system of the runner used
- # to run the pipeline. These metrics below track the number of
- # matched and unmatched words.
- self.matched_words = Metrics.counter(self.__class__, "matched_words")
- self.umatched_words = Metrics.counter(self.__class__, "umatched_words")
-
- def process(self, element):
- word, _ = element
- if re.match(self.pattern, word):
- # Log at INFO level each element we match. When executing this
pipeline
- # using the Dataflow service, these log lines will appear in the
Cloud
- # Logging UI.
- logging.info("Matched %s", word)
- self.matched_words.inc()
- yield element
- else:
- # Log at the "DEBUG" level each element that is not matched.
Different log
- # levels can be used to control the verbosity of logging providing
an
- # effective mechanism to filter less important information.
- # Note currently only "INFO" and higher level logs are emitted to
the
- # Cloud Logger. This log message will not be visible in the Cloud
Logger.
- logging.debug("Did not match %s", word)
- self.umatched_words.inc()
-
-
-class CountWords(beam.PTransform):
- """
- A transform to count the occurrences of each word.
-
- A PTransform that converts a PCollection containing lines of text into a
- PCollection of (word, count) tuples.
- """
-
- def expand(self, pcoll):
- def count_ones(word_ones):
- (word, ones) = word_ones
- return (word, sum(ones))
-
- return (
- pcoll
- | "split" >> (beam.FlatMap(lambda x: re.findall(r"[A-Za-z\']+",
x)).with_output_types(str))
- | "pair_with_one" >> beam.Map(lambda x: (x, 1))
- | "group" >> beam.GroupByKey()
- | "count" >> beam.Map(count_ones)
- )
-
-
-def run(argv=None, save_main_session=True):
- """Run the debugging wordcount pipeline."""
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--input",
- dest="input",
- default="gs://dataflow-samples/shakespeare/kinglear.txt",
- help="Input file to process.",
- )
- parser.add_argument("--output", dest="output", required=True, help="Output
file to write results to.")
- known_args, pipeline_args = parser.parse_known_args(argv)
- # We use the save_main_session option because one or more DoFn's in this
- # workflow rely on global context (e.g., a module imported at module
level).
- pipeline_options = PipelineOptions(pipeline_args)
- pipeline_options.view_as(SetupOptions).save_main_session =
save_main_session
- with beam.Pipeline(options=pipeline_options) as p:
-
- # Read the text file[pattern] into a PCollection, count the
occurrences of
- # each word and filter by a list of words.
- filtered_words = (
- p
- | "read" >> ReadFromText(known_args.input)
- | CountWords()
- | "FilterText" >> beam.ParDo(FilterTextFn("Flourish|stomach"))
- )
-
- # assert_that is a convenient PTransform that checks a PCollection has
an
- # expected value. Asserts are best used in unit tests with small data
sets
- # but is demonstrated here as a teaching tool.
- #
- # Note assert_that does not provide any output and that successful
- # completion of the Pipeline implies that the expectations were met.
Learn
- # more at
https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
- # on how to best test your pipeline.
- assert_that(filtered_words, equal_to([("Flourish", 3), ("stomach",
1)]))
-
- # Format the counts into a PCollection of strings and write the output
using
- # a "Write" transform that has side effects.
- # pylint: disable=unused-variable
- def format_result(word_count):
- (word, count) = word_count
- return f"{word}: {count}"
-
- _ = filtered_words | "format" >> beam.Map(format_result) | "write" >>
WriteToText(known_args.output)
-
-
-if __name__ == "__main__":
- # Cloud Logging would contain only logging.INFO and higher level logs
logged
- # by the root logger. All log statements emitted by the root logger will be
- # visible in the Cloud Logging UI. Learn more at
- # https://cloud.google.com/logging about the Cloud Logging UI.
- #
- # You can set the default logging level to a different level when running
- # locally.
- logging.getLogger().setLevel(logging.INFO)
- run()
diff --git
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
index e7f5d7d920..8ad202f292 100644
--- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
@@ -100,10 +100,8 @@ TEST_IMPORT_CONFIG = [
"data_item_labels": {
"test-labels-name": "test-labels-value",
},
- "import_schema_uri": (
-
"gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
- ),
- "gcs_source": {"uris": ["gs://cloud-samples-data/vision/salads.csv"]},
+ "import_schema_uri":
"image_classification_single_label_io_format_1.0.0.yaml",
+ "gcs_source": {"uris":
[f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset-flowers.csv"]},
},
]
DATASET_TO_UPDATE = {"display_name": "test-name"}
diff --git
a/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py
b/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py
index a72c296d90..dfef99c465 100644
---
a/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py
+++
b/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py
@@ -71,7 +71,7 @@ DETECT_IMAGE = {"source": {"image_uri":
GCP_VISION_ANNOTATE_IMAGE_URL}}
# Public bucket holding the sample data
BUCKET_NAME_SRC = "cloud-samples-data"
# Path to the data inside the public bucket
-PATH_SRC = "vision/ocr/sign.jpg"
+PATH_SRC = "vision/logo/google_logo.jpg"
with DAG(