This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d2f6058ace6 Updates validation on --dataflow_endpoint to accept any 
legal url. (#27959)
d2f6058ace6 is described below

commit d2f6058ace648c509e2a319544feed71ad2c971c
Author: Kerry Donny-Clark <[email protected]>
AuthorDate: Fri Aug 18 16:58:15 2023 -0400

    Updates validation on --dataflow_endpoint to accept any legal url. (#27959)
    
    * Updates validation on --dataflow_endpoint to accept any legal url.
    
    * Adds validators to requirements
    
    * Fixes typo, adds validators to setup.py
    
    * Changes validators version to most recent available.
    
    * Correctly uses generated files for requirements.txt
    
    * Updates 3.9 requirements (skipped by gradle command)
    
    * Removes validators package, and uses basic url validaton with urllib.
    
    * Fixes import ordering.
    
    * Removes print in test.
    
    * Updates pipeline options in DataflowRunner test to valid values.
    
    * Removes validation that requires --staging_location to be in gs.
    
    * Fixes tests, small refactor, and brings behavior in line with Cloud 
Dataflow documentation.
    
    * Fixes tests.
    
    * Fixes more tests.
    
    * Fixes imports.
---
 .../python/apache_beam/options/pipeline_options.py |  38 ++++--
 .../apache_beam/options/pipeline_options_test.py   |  99 ++++++++++++++
 .../options/pipeline_options_validator.py          |  35 ++++-
 .../options/pipeline_options_validator_test.py     | 149 +++++++++++----------
 .../runners/dataflow/dataflow_runner_test.py       |   6 +-
 .../runners/dataflow/template_runner_test.py       |  23 ++--
 .../python/apache_beam/transforms/external_test.py |   7 +-
 7 files changed, 244 insertions(+), 113 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 53233bae27c..6d9c5ecd37b 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -850,24 +850,36 @@ class GoogleCloudOptions(PipelineOptions):
     else:
       return None
 
+  # If either temp or staging location has an issue, we use the valid one for
+  # both locations. If both are bad we return an error.
+  def _handle_temp_and_staging_locations(self, validator):
+    temp_errors = validator.validate_gcs_path(self, 'temp_location')
+    staging_errors = validator.validate_gcs_path(self, 'staging_location')
+    if temp_errors and not staging_errors:
+      setattr(self, 'temp_location', getattr(self, 'staging_location'))
+      return []
+    elif staging_errors and not temp_errors:
+      setattr(self, 'staging_location', getattr(self, 'temp_location'))
+      return []
+    elif not staging_errors and not temp_errors:
+      return []
+    # Both staging and temp locations are bad, try to use default bucket.
+    else:
+      default_bucket = self._create_default_gcs_bucket()
+      if default_bucket is None:
+        temp_errors.extend(staging_errors)
+        return temp_errors
+      else:
+        setattr(self, 'temp_location', default_bucket)
+        setattr(self, 'staging_location', default_bucket)
+        return []
+
   def validate(self, validator):
     errors = []
     if validator.is_service_runner():
+      errors.extend(self._handle_temp_and_staging_locations(validator))
       errors.extend(validator.validate_cloud_options(self))
 
-      # Validating temp_location, or adding a default if there are issues
-      temp_location_errors = validator.validate_gcs_path(self, 'temp_location')
-      if temp_location_errors:
-        default_bucket = self._create_default_gcs_bucket()
-        if default_bucket is None:
-          errors.extend(temp_location_errors)
-        else:
-          setattr(self, 'temp_location', default_bucket)
-
-      if getattr(self, 'staging_location',
-                 None) or getattr(self, 'temp_location', None) is None:
-        errors.extend(validator.validate_gcs_path(self, 'staging_location'))
-
     if self.view_as(DebugOptions).dataflow_job_file:
       if self.view_as(GoogleCloudOptions).template_location:
         errors.append(
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py 
b/sdks/python/apache_beam/options/pipeline_options_test.py
index f83f703e33b..5355e4a7d3b 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -32,6 +32,7 @@ from apache_beam.options.pipeline_options import 
ProfilingOptions
 from apache_beam.options.pipeline_options import TypeOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.options.pipeline_options import _BeamArgumentParser
+from apache_beam.options.pipeline_options_validator import 
PipelineOptionsValidator
 from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.options.value_provider import StaticValueProvider
 from apache_beam.transforms.display import DisplayData
@@ -40,6 +41,25 @@ from apache_beam.transforms.display_test import 
DisplayDataItemMatcher
 _LOGGER = logging.getLogger(__name__)
 
 
+# Mock runners to use for validations.
+class MockRunners(object):
+  class DataflowRunner(object):
+    def get_default_gcp_region(self):
+      # Return a default so we don't have to specify --region in every test
+      # (unless specifically testing it).
+      return 'us-central1'
+
+
+class MockGoogleCloudOptionsNoBucket(GoogleCloudOptions):
+  def _create_default_gcs_bucket(self):
+    return None
+
+
+class MockGoogleCloudOptionsWithBucket(GoogleCloudOptions):
+  def _create_default_gcs_bucket(self):
+    return "gs://default/bucket"
+
+
 class PipelineOptionsTest(unittest.TestCase):
   def setUp(self):
     # Reset runtime options to avoid side-effects caused by other tests.
@@ -703,6 +723,85 @@ class PipelineOptionsTest(unittest.TestCase):
         "the dest and the flag name to the map "
         "_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")
 
+  def test_validation_good_stg_good_temp(self):
+    runner = MockRunners.DataflowRunner()
+    options = GoogleCloudOptions([
+        '--project=myproject',
+        '--staging_location=gs://beam/stg',
+        '--temp_location=gs://beam/tmp'
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = options._handle_temp_and_staging_locations(validator)
+    self.assertEqual(errors, [])
+    self.assertEqual(
+        options.get_all_options()['staging_location'], "gs://beam/stg")
+    self.assertEqual(
+        options.get_all_options()['temp_location'], "gs://beam/tmp")
+
+  def test_validation_bad_stg_good_temp(self):
+    runner = MockRunners.DataflowRunner()
+    options = GoogleCloudOptions([
+        '--project=myproject',
+        '--staging_location=badGSpath',
+        '--temp_location=gs://beam/tmp'
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = options._handle_temp_and_staging_locations(validator)
+    self.assertEqual(errors, [])
+    self.assertEqual(
+        options.get_all_options()['staging_location'], "gs://beam/tmp")
+    self.assertEqual(
+        options.get_all_options()['temp_location'], "gs://beam/tmp")
+
+  def test_validation_good_stg_bad_temp(self):
+    runner = MockRunners.DataflowRunner()
+    options = GoogleCloudOptions([
+        '--project=myproject',
+        '--staging_location=gs://beam/stg',
+        '--temp_location=badGSpath'
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = options._handle_temp_and_staging_locations(validator)
+    self.assertEqual(errors, [])
+    self.assertEqual(
+        options.get_all_options()['staging_location'], "gs://beam/stg")
+    self.assertEqual(
+        options.get_all_options()['temp_location'], "gs://beam/stg")
+
+  def test_validation_bad_stg_bad_temp_with_default(self):
+    runner = MockRunners.DataflowRunner()
+    options = MockGoogleCloudOptionsWithBucket([
+        '--project=myproject',
+        '--staging_location=badGSpath',
+        '--temp_location=badGSpath'
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = options._handle_temp_and_staging_locations(validator)
+    self.assertEqual(errors, [])
+    self.assertEqual(
+        options.get_all_options()['staging_location'], "gs://default/bucket")
+    self.assertEqual(
+        options.get_all_options()['temp_location'], "gs://default/bucket")
+
+  def test_validation_bad_stg_bad_temp_no_default(self):
+    runner = MockRunners.DataflowRunner()
+    options = MockGoogleCloudOptionsNoBucket([
+        '--project=myproject',
+        '--staging_location=badGSpath',
+        '--temp_location=badGSpath'
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = options._handle_temp_and_staging_locations(validator)
+    self.assertEqual(len(errors), 2, errors)
+    self.assertIn(
+        'Invalid GCS path (badGSpath), given for the option: temp_location.',
+        errors,
+        errors)
+    self.assertIn(
+        'Invalid GCS path (badGSpath), given for the option: 
staging_location.',
+        errors,
+        errors)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py 
b/sdks/python/apache_beam/options/pipeline_options_validator.py
index 3f489c0b60c..640569b6293 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator.py
@@ -23,6 +23,8 @@ For internal use only; no backwards-compatibility guarantees.
 
 import logging
 import re
+import string
+from urllib.parse import urlparse
 
 from apache_beam.internal import pickler
 from apache_beam.options.pipeline_options import DebugOptions
@@ -92,6 +94,8 @@ class PipelineOptionsValidator(object):
   ERR_INVALID_PROJECT_ID = (
       'Invalid Project ID (%s). Please make sure you specified the Project ID, 
'
       'not project description.')
+  ERR_INVALID_ENDPOINT = (
+      'Invalid url (%s) for dataflow endpoint. Please provide a valid url.')
   ERR_INVALID_NOT_POSITIVE = (
       'Invalid value (%s) for option: %s. Value needs '
       'to be positive.')
@@ -125,7 +129,6 @@ class PipelineOptionsValidator(object):
   JOB_PATTERN = '[a-z]([-a-z0-9]*[a-z0-9])?'
   PROJECT_ID_PATTERN = '[a-z][-a-z0-9:.]+[a-z0-9]'
   PROJECT_NUMBER_PATTERN = '[0-9]*'
-  ENDPOINT_PATTERN = r'https://[\S]*googleapis\.com[/]?'
 
   def __init__(self, options, runner):
     self.options = options
@@ -154,10 +157,7 @@ class PipelineOptionsValidator(object):
 
     dataflow_endpoint = (
         self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
-    is_service_endpoint = (
-        dataflow_endpoint is not None and
-        self.is_full_string_match(self.ENDPOINT_PATTERN, dataflow_endpoint))
-
+    is_service_endpoint = (dataflow_endpoint is not None)
     return is_service_runner and is_service_endpoint
 
   def is_full_string_match(self, pattern, string):
@@ -189,7 +189,6 @@ class PipelineOptionsValidator(object):
       return self._validate_error(self.ERR_INVALID_GCS_BUCKET, arg, arg_name)
     if gcs_object is None or '\n' in gcs_object or '\r' in gcs_object:
       return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name)
-
     return []
 
   def validate_cloud_options(self, view):
@@ -231,6 +230,15 @@ class PipelineOptionsValidator(object):
         errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'region'))
       else:
         view.region = default_region
+    dataflow_endpoint = view.dataflow_endpoint
+    if dataflow_endpoint is None:
+      errors.extend(
+          self._validate_error(self.ERR_MISSING_OPTION, dataflow_endpoint))
+    else:
+      valid_endpoint = self.validate_endpoint_url(dataflow_endpoint)
+      if valid_endpoint is False:
+        errors.extend(
+            self._validate_error(self.ERR_INVALID_ENDPOINT, dataflow_endpoint))
     return errors
 
   def validate_sdk_container_image_options(self, view):
@@ -291,7 +299,7 @@ class PipelineOptionsValidator(object):
           self._validate_error(
               'Cannot use deprecated flag --zone along with worker_region or '
               'worker_zone.'))
-    if self.options.view_as(DebugOptions).lookup_experiment('worker_region')\
+    if self.options.view_as(DebugOptions).lookup_experiment('worker_region') \
         and (view.worker_region or view.worker_zone):
       errors.extend(
           self._validate_error(
@@ -393,3 +401,16 @@ class PipelineOptionsValidator(object):
       return self._validate_error(
           self.ERR_REPEATABLE_OPTIONS_NOT_SET_AS_LIST, arg, arg_name)
     return []
+
+  # Minimally validates the endpoint url. This is not a strict application
+  # of http://www.faqs.org/rfcs/rfc1738.html.
+  def validate_endpoint_url(self, endpoint_url):
+    url_parts = urlparse(endpoint_url, allow_fragments=False)
+    if not url_parts.scheme or not url_parts.netloc:
+      return False
+    if url_parts.scheme not in ['http', 'https']:
+      return False
+    if set(
+        url_parts.netloc) <= set(string.ascii_letters + string.digits + '-.'):
+      return True
+    return False
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py 
b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index ce37688ef82..653ea112c8c 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -110,69 +110,65 @@ class SetupTest(unittest.TestCase):
       validator = PipelineOptionsValidator(pipeline_options, runner)
       return validator
 
-    test_cases = [
-        {
-            'temp_location': None,
-            'staging_location': 'gs://foo/bar',
-            'errors': ['temp_location']
-        },
-        {
-            'temp_location': None,
-            'staging_location': None,
-            'errors': ['staging_location', 'temp_location']
-        },
-        {
-            'temp_location': 'gs://foo/bar',
-            'staging_location': None,
-            'errors': []
-        },
-        {
-            'temp_location': 'gs://foo/bar',
-            'staging_location': 'gs://ABC/bar',
-            'errors': ['staging_location']
-        },
-        {
-            'temp_location': 'gcs:/foo/bar',
-            'staging_location': 'gs://foo/bar',
-            'errors': ['temp_location']
-        },
-        {
-            'temp_location': 'gs:/foo/bar',
-            'staging_location': 'gs://foo/bar',
-            'errors': ['temp_location']
-        },
-        {
-            'temp_location': 'gs://ABC/bar',
-            'staging_location': 'gs://foo/bar',
-            'errors': ['temp_location']
-        },
-        {
-            'temp_location': 'gs://ABC/bar',
-            'staging_location': 'gs://foo/bar',
-            'errors': ['temp_location']
-        },
-        {
-            'temp_location': 'gs://foo',
-            'staging_location': 'gs://foo/bar',
-            'errors': ['temp_location']
-        },
-        {
-            'temp_location': 'gs://foo/',
-            'staging_location': 'gs://foo/bar',
-            'errors': []
-        },
-        {
-            'temp_location': 'gs://foo/bar',
-            'staging_location': 'gs://foo/bar',
-            'errors': []
-        },
-    ]
+    test_cases = [{
+        'temp_location': None, 'staging_location': 'gs://foo/bar', 'errors': []
+    },
+                  {
+                      'temp_location': None,
+                      'staging_location': None,
+                      'errors': ['staging_location', 'temp_location']
+                  },
+                  {
+                      'temp_location': 'gs://foo/bar',
+                      'staging_location': None,
+                      'errors': []
+                  },
+                  {
+                      'temp_location': 'gs://foo/bar',
+                      'staging_location': 'gs://ABC/bar',
+                      'errors': []
+                  },
+                  {
+                      'temp_location': 'gcs:/foo/bar',
+                      'staging_location': 'gs://foo/bar',
+                      'errors': []
+                  },
+                  {
+                      'temp_location': 'gs:/foo/bar',
+                      'staging_location': 'gs://foo/bar',
+                      'errors': []
+                  },
+                  {
+                      'temp_location': 'gs://ABC/bar',
+                      'staging_location': 'gs://foo/bar',
+                      'errors': []
+                  },
+                  {
+                      'temp_location': 'gs://ABC/bar',
+                      'staging_location': 'gs://BCD/bar',
+                      'errors': ['temp_location', 'staging_location']
+                  },
+                  {
+                      'temp_location': 'gs://foo',
+                      'staging_location': 'gs://foo/bar',
+                      'errors': []
+                  },
+                  {
+                      'temp_location': 'gs://foo/',
+                      'staging_location': 'gs://foo/bar',
+                      'errors': []
+                  },
+                  {
+                      'temp_location': 'gs://foo/bar',
+                      'staging_location': 'gs://foo/bar',
+                      'errors': []
+                  }]
 
     for case in test_cases:
       errors = get_validator(case['temp_location'],
                              case['staging_location']).validate()
       self.assertEqual(
-          self.check_errors_for_arguments(errors, case['errors']), [])
+          self.check_errors_for_arguments(errors, case['errors']), [], case)
 
   def test_project(self):
     def get_validator(project):
@@ -318,12 +314,7 @@ class SetupTest(unittest.TestCase):
         {
             'runner': MockRunners.DataflowRunner(),
             'options': ['--dataflow_endpoint=https://another.service.com'],
-            'expected': False,
-        },
-        {
-            'runner': MockRunners.DataflowRunner(),
-            'options': ['--dataflow_endpoint=https://another.service.com/'],
-            'expected': False,
+            'expected': True,
         },
         {
             'runner': MockRunners.DataflowRunner(),
@@ -332,7 +323,7 @@ class SetupTest(unittest.TestCase):
         },
         {
             'runner': MockRunners.DataflowRunner(),
-            'options': 
['--dataflow_endpoint=https://dataflow.googleapis.com/'],
+            'options': ['--dataflow_endpoint=foo: //dataflow. googleapis. 
com'],
             'expected': True,
         },
         {
@@ -478,20 +469,17 @@ class SetupTest(unittest.TestCase):
     self.assertIn('experiment', errors[0])
     self.assertIn('worker_region', errors[0])
 
-  def test_experiment_region_and_worker_zone_mutually_exclusive(self):
+  def test_region_and_worker_zone_mutually_exclusive(self):
     runner = MockRunners.DataflowRunner()
     options = PipelineOptions([
-        '--experiments',
-        'worker_region=us-west1',
-        '--worker_zone',
-        'us-east1-b',
+        '--worker_region=us-west1',
+        '--worker_zone=us-east1-b',
         '--project=example:example',
         '--temp_location=gs://foo/bar',
     ])
     validator = PipelineOptionsValidator(options, runner)
     errors = validator.validate()
     self.assertEqual(len(errors), 1)
-    self.assertIn('experiment', errors[0])
     self.assertIn('worker_region', errors[0])
     self.assertIn('worker_zone', errors[0])
 
@@ -556,18 +544,31 @@ class SetupTest(unittest.TestCase):
     self.assertEqual(options.view_as(WorkerOptions).worker_zone, 'us-east1-b')
 
   def test_region_optional_for_non_service_runner(self):
-    runner = MockRunners.DataflowRunner()
+    runner = MockRunners.OtherRunner()
     # Remove default region for this test.
     runner.get_default_gcp_region = lambda: None
     options = PipelineOptions([
         '--project=example:example',
         '--temp_location=gs://foo/bar',
-        '--dataflow_endpoint=http://localhost:20281',
     ])
     validator = PipelineOptionsValidator(options, runner)
     errors = validator.validate()
     self.assertEqual(len(errors), 0)
 
+  def test_dataflow_endpoint_is_a_url(self):
+    runner = MockRunners.DataflowRunner()
+    # Remove default region for this test.
+    options = PipelineOptions([
+        '--project=example:example',
+        '--temp_location=gs://foo/bar',
+        '--staging_location=gs://foo/baz',
+        '--dataflow_endpoint=foo and bar'
+    ])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertEqual(len(errors), 1, errors)
+    self.assertIn("Invalid url (foo and bar)", errors[0])
+
   def test_alias_sdk_container_to_worker_harness(self):
     runner = MockRunners.DataflowRunner()
     test_image = "SDK_IMAGE"
@@ -924,7 +925,7 @@ class SetupTest(unittest.TestCase):
         errors.append(
             'Options "%s" had unexpected validation results: "%s"' %
             (' '.join(case['options']), ' '.join(validation_errors)))
-    self.assertEqual(errors, [])
+    self.assertEqual(errors, [], errors)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 1e084b98278..b58531acc6a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -87,11 +87,11 @@ class SpecialDoFn(beam.DoFn):
 class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
   def setUp(self):
     self.default_properties = [
-        '--dataflow_endpoint=ignored',
         '--job_name=test-job',
         '--project=test-project',
-        '--staging_location=ignored',
-        '--temp_location=/dev/null',
+        '--region=us-central1'
+        '--staging_location=gs://beam/test',
+        '--temp_location=gs://beam/tmp',
         '--no_auth',
         '--dry_run=True',
         '--sdk_location=container'
diff --git a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
index 021056608f1..792c5cfd165 100644
--- a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
@@ -51,17 +51,16 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
     dummy_dir = tempfile.mkdtemp()
 
     remote_runner = DataflowRunner()
-    with Pipeline(remote_runner,
-                  options=PipelineOptions(['--dataflow_endpoint=ignored',
-                                           '--sdk_location=' + dummy_file_name,
-                                           '--job_name=test-job',
-                                           '--project=test-project',
-                                           '--staging_location=' + dummy_dir,
-                                           '--temp_location=/dev/null',
-                                           '--template_location=' +
-                                           dummy_file_name,
-                                           '--no_auth'])) as pipeline:
-
+    options = PipelineOptions([
+        '--sdk_location=' + dummy_file_name,
+        '--job_name=test-job',
+        '--project=apache-beam-testing',
+        '--region=us-central1',
+        '--staging_location=gs://apache-beam-testing-stg/stg/',
+        '--temp_location=gs://apache-beam-testing-temp/tmp',
+        '--template_location=' + dummy_file_name
+    ])
+    with Pipeline(remote_runner, options) as pipeline:
       pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x)  # pylint: 
disable=expression-not-assigned
 
     with open(dummy_file_name) as template_file:
@@ -69,7 +68,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
       self.assertEqual(
           saved_job_dict['environment']['sdkPipelineOptions']['options']
           ['project'],
-          'test-project')
+          'apache-beam-testing')
       self.assertEqual(
           saved_job_dict['environment']['sdkPipelineOptions']['options']
           ['job_name'],
diff --git a/sdks/python/apache_beam/transforms/external_test.py 
b/sdks/python/apache_beam/transforms/external_test.py
index d3bb02a7f94..fd5c81de659 100644
--- a/sdks/python/apache_beam/transforms/external_test.py
+++ b/sdks/python/apache_beam/transforms/external_test.py
@@ -202,16 +202,15 @@ class ExternalTransformTest(unittest.TestCase):
   @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
   def test_pipeline_generation_with_runner_overrides(self):
     pipeline_properties = [
-        '--dataflow_endpoint=ignored',
         '--job_name=test-job',
         '--project=test-project',
-        '--staging_location=ignored',
-        '--temp_location=/dev/null',
+        '--temp_location=gs://beam/tmp',
         '--no_auth',
         '--dry_run=True',
         '--sdk_location=container',
         '--runner=DataflowRunner',
-        '--streaming'
+        '--streaming',
+        '--region=us-central1'
     ]
 
     with beam.Pipeline(options=PipelineOptions(pipeline_properties)) as p:

Reply via email to