This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c9bd753a33 Fix system tests for dataflow, vision and vertex_ai (#38326)
c9bd753a33 is described below

commit c9bd753a33e2adc73831beffdd27f7e5ee014791
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Mar 20 16:43:08 2024 +0100

    Fix system tests for dataflow, vision and vertex_ai (#38326)
---
 .../dataflow/example_dataflow_native_python.py     |  21 +--
 .../example_dataflow_native_python_async.py        |  17 +--
 .../dataflow/example_dataflow_streaming_python.py  |   4 +-
 .../dataflow/resources/wordcount_debugging.txt     | 168 ---------------------
 .../cloud/vertex_ai/example_vertex_ai_dataset.py   |   6 +-
 .../cloud/vision/example_vision_annotate_image.py  |   2 +-
 6 files changed, 13 insertions(+), 205 deletions(-)

diff --git 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py
 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py
index dc8449cb31..346ba76db0 100644
--- 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py
+++ 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py
@@ -24,27 +24,24 @@ from __future__ import annotations
 
 import os
 from datetime import datetime
-from pathlib import Path
 
 from airflow.models.dag import DAG
 from airflow.providers.apache.beam.hooks.beam import BeamRunnerType
 from airflow.providers.apache.beam.operators.beam import 
BeamRunPythonPipelineOperator
 from airflow.providers.google.cloud.operators.dataflow import 
DataflowStopJobOperator
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
-from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
 from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "dataflow_native_python"
 
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
 
-PYTHON_FILE_NAME = "wordcount_debugging.py"
 GCS_TMP = f"gs://{BUCKET_NAME}/temp/"
 GCS_STAGING = f"gs://{BUCKET_NAME}/staging/"
 GCS_OUTPUT = f"gs://{BUCKET_NAME}/output"
-GCS_PYTHON_SCRIPT = f"gs://{BUCKET_NAME}/{PYTHON_FILE_NAME}"
-PYTHON_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / 
PYTHON_FILE_NAME)
+GCS_PYTHON_SCRIPT = 
f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/wordcount_debugging.py"
 LOCATION = "europe-west3"
 
 default_args = {
@@ -64,13 +61,6 @@ with DAG(
 ) as dag:
     create_bucket = GCSCreateBucketOperator(task_id="create_bucket", 
bucket_name=BUCKET_NAME)
 
-    upload_file = LocalFilesystemToGCSOperator(
-        task_id="upload_file_to_bucket",
-        src=PYTHON_FILE_LOCAL_PATH,
-        dst=PYTHON_FILE_NAME,
-        bucket=BUCKET_NAME,
-    )
-
     # [START howto_operator_start_python_job]
     start_python_job = BeamRunPythonPipelineOperator(
         runner=BeamRunnerType.DataflowRunner,
@@ -80,10 +70,10 @@ with DAG(
         pipeline_options={
             "output": GCS_OUTPUT,
         },
-        py_requirements=["apache-beam[gcp]==2.46.0"],
+        py_requirements=["apache-beam[gcp]==2.47.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
-        dataflow_config={"location": LOCATION},
+        dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
     )
     # [END howto_operator_start_python_job]
 
@@ -94,7 +84,7 @@ with DAG(
         pipeline_options={
             "output": GCS_OUTPUT,
         },
-        py_requirements=["apache-beam[gcp]==2.46.0"],
+        py_requirements=["apache-beam[gcp]==2.47.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
     )
@@ -114,7 +104,6 @@ with DAG(
     (
         # TEST SETUP
         create_bucket
-        >> upload_file
         # TEST BODY
         >> start_python_job
         >> start_python_job_local
diff --git 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py
 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py
index 01a7bce74c..c7c2e62e76 100644
--- 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py
+++ 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py
@@ -24,7 +24,6 @@ from __future__ import annotations
 
 import os
 from datetime import datetime
-from pathlib import Path
 from typing import Callable
 
 from airflow.exceptions import AirflowException
@@ -39,20 +38,18 @@ from airflow.providers.google.cloud.sensors.dataflow import 
(
     DataflowJobMetricsSensor,
     DataflowJobStatusSensor,
 )
-from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
 from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "dataflow_native_python_async"
 
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
 
-PYTHON_FILE_NAME = "wordcount_debugging.txt"
 GCS_TMP = f"gs://{BUCKET_NAME}/temp/"
 GCS_STAGING = f"gs://{BUCKET_NAME}/staging/"
 GCS_OUTPUT = f"gs://{BUCKET_NAME}/output"
-GCS_PYTHON_SCRIPT = f"gs://{BUCKET_NAME}/{PYTHON_FILE_NAME}"
-PYTHON_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / 
PYTHON_FILE_NAME)
+GCS_PYTHON_SCRIPT = 
f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/wordcount_debugging.py"
 LOCATION = "europe-west3"
 
 default_args = {
@@ -72,13 +69,6 @@ with DAG(
 ) as dag:
     create_bucket = GCSCreateBucketOperator(task_id="create_bucket", 
bucket_name=BUCKET_NAME)
 
-    upload_file = LocalFilesystemToGCSOperator(
-        task_id="upload_file_to_bucket",
-        src=PYTHON_FILE_LOCAL_PATH,
-        dst=PYTHON_FILE_NAME,
-        bucket=BUCKET_NAME,
-    )
-
     # [START howto_operator_start_python_job_async]
     start_python_job_async = BeamRunPythonPipelineOperator(
         task_id="start_python_job_async",
@@ -88,7 +78,7 @@ with DAG(
         pipeline_options={
             "output": GCS_OUTPUT,
         },
-        py_requirements=["apache-beam[gcp]==2.46.0"],
+        py_requirements=["apache-beam[gcp]==2.47.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
         dataflow_config={
@@ -174,7 +164,6 @@ with DAG(
     (
         # TEST SETUP
         create_bucket
-        >> upload_file
         # TEST BODY
         >> start_python_job_async
         >> [
diff --git 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
index 63e5c780e3..a858ad90aa 100644
--- 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
+++ 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
@@ -82,10 +82,10 @@ with DAG(
             "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
             "streaming": True,
         },
-        py_requirements=["apache-beam[gcp]==2.46.0"],
+        py_requirements=["apache-beam[gcp]==2.47.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
-        dataflow_config={"location": LOCATION},
+        dataflow_config={"location": LOCATION, "job_name": 
"start_python_job_streaming"},
     )
     # [END howto_operator_start_streaming_python_job]
 
diff --git 
a/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt
 
b/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt
deleted file mode 100644
index efe7d205c4..0000000000
--- 
a/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt
+++ /dev/null
@@ -1,168 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-An example that verifies the counts and includes best practices.
-
-On top of the basic concepts in the wordcount example, this workflow introduces
-logging to Cloud Logging, and using assertions in a Dataflow pipeline.
-To execute this pipeline locally, specify a local output file or output prefix
-on GCS::
-
-  --output [YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
-
-To execute this pipeline using the Google Cloud Dataflow service, specify
-pipeline configuration::
-
-  --project YOUR_PROJECT_ID
-  --staging_location gs://YOUR_STAGING_DIRECTORY
-  --temp_location gs://YOUR_TEMP_DIRECTORY
-  --region GCE_REGION
-  --job_name YOUR_JOB_NAME
-  --runner DataflowRunner
-
-and an output prefix on GCS.
-"""
-
-# pytype: skip-file
-
-from __future__ import annotations
-
-import argparse
-import logging
-import re
-
-import apache_beam as beam
-from apache_beam.io import ReadFromText, WriteToText
-from apache_beam.metrics import Metrics
-from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
-from apache_beam.testing.util import assert_that, equal_to
-
-
-class FilterTextFn(beam.DoFn):
-    """A DoFn that filters for a specific key based on a regular expression."""
-
-    def __init__(self, pattern):
-        # TODO(BEAM-6158): Revert the workaround once we can pickle super() on 
py3.
-        # super(FilterTextFn, self).__init__()
-        beam.DoFn.__init__(self)
-        self.pattern = pattern
-        # A custom metric can track values in your pipeline as it runs. Those
-        # values will be available in the monitoring system of the runner used
-        # to run the pipeline. These metrics below track the number of
-        # matched and unmatched words.
-        self.matched_words = Metrics.counter(self.__class__, "matched_words")
-        self.umatched_words = Metrics.counter(self.__class__, "umatched_words")
-
-    def process(self, element):
-        word, _ = element
-        if re.match(self.pattern, word):
-            # Log at INFO level each element we match. When executing this 
pipeline
-            # using the Dataflow service, these log lines will appear in the 
Cloud
-            # Logging UI.
-            logging.info("Matched %s", word)
-            self.matched_words.inc()
-            yield element
-        else:
-            # Log at the "DEBUG" level each element that is not matched. 
Different log
-            # levels can be used to control the verbosity of logging providing 
an
-            # effective mechanism to filter less important information.
-            # Note currently only "INFO" and higher level logs are emitted to 
the
-            # Cloud Logger. This log message will not be visible in the Cloud 
Logger.
-            logging.debug("Did not match %s", word)
-            self.umatched_words.inc()
-
-
-class CountWords(beam.PTransform):
-    """
-    A transform to count the occurrences of each word.
-
-    A PTransform that converts a PCollection containing lines of text into a
-    PCollection of (word, count) tuples.
-    """
-
-    def expand(self, pcoll):
-        def count_ones(word_ones):
-            (word, ones) = word_ones
-            return (word, sum(ones))
-
-        return (
-            pcoll
-            | "split" >> (beam.FlatMap(lambda x: re.findall(r"[A-Za-z\']+", 
x)).with_output_types(str))
-            | "pair_with_one" >> beam.Map(lambda x: (x, 1))
-            | "group" >> beam.GroupByKey()
-            | "count" >> beam.Map(count_ones)
-        )
-
-
-def run(argv=None, save_main_session=True):
-    """Run the debugging wordcount pipeline."""
-    parser = argparse.ArgumentParser()
-    parser.add_argument(
-        "--input",
-        dest="input",
-        default="gs://dataflow-samples/shakespeare/kinglear.txt",
-        help="Input file to process.",
-    )
-    parser.add_argument("--output", dest="output", required=True, help="Output 
file to write results to.")
-    known_args, pipeline_args = parser.parse_known_args(argv)
-    # We use the save_main_session option because one or more DoFn's in this
-    # workflow rely on global context (e.g., a module imported at module 
level).
-    pipeline_options = PipelineOptions(pipeline_args)
-    pipeline_options.view_as(SetupOptions).save_main_session = 
save_main_session
-    with beam.Pipeline(options=pipeline_options) as p:
-
-        # Read the text file[pattern] into a PCollection, count the 
occurrences of
-        # each word and filter by a list of words.
-        filtered_words = (
-            p
-            | "read" >> ReadFromText(known_args.input)
-            | CountWords()
-            | "FilterText" >> beam.ParDo(FilterTextFn("Flourish|stomach"))
-        )
-
-        # assert_that is a convenient PTransform that checks a PCollection has 
an
-        # expected value. Asserts are best used in unit tests with small data 
sets
-        # but is demonstrated here as a teaching tool.
-        #
-        # Note assert_that does not provide any output and that successful
-        # completion of the Pipeline implies that the expectations were  met. 
Learn
-        # more at 
https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
-        # on how to best test your pipeline.
-        assert_that(filtered_words, equal_to([("Flourish", 3), ("stomach", 
1)]))
-
-        # Format the counts into a PCollection of strings and write the output 
using
-        # a "Write" transform that has side effects.
-        # pylint: disable=unused-variable
-        def format_result(word_count):
-            (word, count) = word_count
-            return f"{word}: {count}"
-
-        _ = filtered_words | "format" >> beam.Map(format_result) | "write" >> 
WriteToText(known_args.output)
-
-
-if __name__ == "__main__":
-    # Cloud Logging would contain only logging.INFO and higher level logs 
logged
-    # by the root logger. All log statements emitted by the root logger will be
-    # visible in the Cloud Logging UI. Learn more at
-    # https://cloud.google.com/logging about the Cloud Logging UI.
-    #
-    # You can set the default logging level to a different level when running
-    # locally.
-    logging.getLogger().setLevel(logging.INFO)
-    run()
diff --git 
a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py 
b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
index e7f5d7d920..8ad202f292 100644
--- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
+++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py
@@ -100,10 +100,8 @@ TEST_IMPORT_CONFIG = [
         "data_item_labels": {
             "test-labels-name": "test-labels-value",
         },
-        "import_schema_uri": (
-            
"gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
-        ),
-        "gcs_source": {"uris": ["gs://cloud-samples-data/vision/salads.csv"]},
+        "import_schema_uri": 
"image_classification_single_label_io_format_1.0.0.yaml",
+        "gcs_source": {"uris": 
[f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset-flowers.csv"]},
     },
 ]
 DATASET_TO_UPDATE = {"display_name": "test-name"}
diff --git 
a/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py 
b/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py
index a72c296d90..dfef99c465 100644
--- 
a/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py
+++ 
b/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py
@@ -71,7 +71,7 @@ DETECT_IMAGE = {"source": {"image_uri": 
GCP_VISION_ANNOTATE_IMAGE_URL}}
 # Public bucket holding the sample data
 BUCKET_NAME_SRC = "cloud-samples-data"
 # Path to the data inside the public bucket
-PATH_SRC = "vision/ocr/sign.jpg"
+PATH_SRC = "vision/logo/google_logo.jpg"
 
 
 with DAG(

Reply via email to