This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d2f6058ace6 Updates validation on --dataflow_endpoint to accept any
legal url. (#27959)
d2f6058ace6 is described below
commit d2f6058ace648c509e2a319544feed71ad2c971c
Author: Kerry Donny-Clark <[email protected]>
AuthorDate: Fri Aug 18 16:58:15 2023 -0400
Updates validation on --dataflow_endpoint to accept any legal url. (#27959)
* Updates validation on --dataflow_endpoint to accept any legal url.
* Adds validators to requirements
* Fixes typo, adds validators to setup.py
* Changes validators version to most recent available.
* Correctly uses generated files for requirements.txt
* Updates 3.9 requirements (skipped by gradle command)
* Removes validators package, and uses basic url validaton with urllib.
* Fixes import ordering.
* Removes print in test.
* Updates pipeline options in DataflowRunner test to valid values.
* Removes validation that requires --staging_location to be in gs.
* Fixes tests, small refactor, and brings behavior in line with Cloud
Dataflow documentation.
* Fixes tests.
* Fixes more tests.
* Fixes imports.
---
.../python/apache_beam/options/pipeline_options.py | 38 ++++--
.../apache_beam/options/pipeline_options_test.py | 99 ++++++++++++++
.../options/pipeline_options_validator.py | 35 ++++-
.../options/pipeline_options_validator_test.py | 149 +++++++++++----------
.../runners/dataflow/dataflow_runner_test.py | 6 +-
.../runners/dataflow/template_runner_test.py | 23 ++--
.../python/apache_beam/transforms/external_test.py | 7 +-
7 files changed, 244 insertions(+), 113 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 53233bae27c..6d9c5ecd37b 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -850,24 +850,36 @@ class GoogleCloudOptions(PipelineOptions):
else:
return None
+ # If either temp or staging location has an issue, we use the valid one for
+ # both locations. If both are bad we return an error.
+ def _handle_temp_and_staging_locations(self, validator):
+ temp_errors = validator.validate_gcs_path(self, 'temp_location')
+ staging_errors = validator.validate_gcs_path(self, 'staging_location')
+ if temp_errors and not staging_errors:
+ setattr(self, 'temp_location', getattr(self, 'staging_location'))
+ return []
+ elif staging_errors and not temp_errors:
+ setattr(self, 'staging_location', getattr(self, 'temp_location'))
+ return []
+ elif not staging_errors and not temp_errors:
+ return []
+ # Both staging and temp locations are bad, try to use default bucket.
+ else:
+ default_bucket = self._create_default_gcs_bucket()
+ if default_bucket is None:
+ temp_errors.extend(staging_errors)
+ return temp_errors
+ else:
+ setattr(self, 'temp_location', default_bucket)
+ setattr(self, 'staging_location', default_bucket)
+ return []
+
def validate(self, validator):
errors = []
if validator.is_service_runner():
+ errors.extend(self._handle_temp_and_staging_locations(validator))
errors.extend(validator.validate_cloud_options(self))
- # Validating temp_location, or adding a default if there are issues
- temp_location_errors = validator.validate_gcs_path(self, 'temp_location')
- if temp_location_errors:
- default_bucket = self._create_default_gcs_bucket()
- if default_bucket is None:
- errors.extend(temp_location_errors)
- else:
- setattr(self, 'temp_location', default_bucket)
-
- if getattr(self, 'staging_location',
- None) or getattr(self, 'temp_location', None) is None:
- errors.extend(validator.validate_gcs_path(self, 'staging_location'))
-
if self.view_as(DebugOptions).dataflow_job_file:
if self.view_as(GoogleCloudOptions).template_location:
errors.append(
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py
b/sdks/python/apache_beam/options/pipeline_options_test.py
index f83f703e33b..5355e4a7d3b 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -32,6 +32,7 @@ from apache_beam.options.pipeline_options import
ProfilingOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import _BeamArgumentParser
+from apache_beam.options.pipeline_options_validator import
PipelineOptionsValidator
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.transforms.display import DisplayData
@@ -40,6 +41,25 @@ from apache_beam.transforms.display_test import
DisplayDataItemMatcher
_LOGGER = logging.getLogger(__name__)
+# Mock runners to use for validations.
+class MockRunners(object):
+ class DataflowRunner(object):
+ def get_default_gcp_region(self):
+ # Return a default so we don't have to specify --region in every test
+ # (unless specifically testing it).
+ return 'us-central1'
+
+
+class MockGoogleCloudOptionsNoBucket(GoogleCloudOptions):
+ def _create_default_gcs_bucket(self):
+ return None
+
+
+class MockGoogleCloudOptionsWithBucket(GoogleCloudOptions):
+ def _create_default_gcs_bucket(self):
+ return "gs://default/bucket"
+
+
class PipelineOptionsTest(unittest.TestCase):
def setUp(self):
# Reset runtime options to avoid side-effects caused by other tests.
@@ -703,6 +723,85 @@ class PipelineOptionsTest(unittest.TestCase):
"the dest and the flag name to the map "
"_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")
+ def test_validation_good_stg_good_temp(self):
+ runner = MockRunners.DataflowRunner()
+ options = GoogleCloudOptions([
+ '--project=myproject',
+ '--staging_location=gs://beam/stg',
+ '--temp_location=gs://beam/tmp'
+ ])
+ validator = PipelineOptionsValidator(options, runner)
+ errors = options._handle_temp_and_staging_locations(validator)
+ self.assertEqual(errors, [])
+ self.assertEqual(
+ options.get_all_options()['staging_location'], "gs://beam/stg")
+ self.assertEqual(
+ options.get_all_options()['temp_location'], "gs://beam/tmp")
+
+ def test_validation_bad_stg_good_temp(self):
+ runner = MockRunners.DataflowRunner()
+ options = GoogleCloudOptions([
+ '--project=myproject',
+ '--staging_location=badGSpath',
+ '--temp_location=gs://beam/tmp'
+ ])
+ validator = PipelineOptionsValidator(options, runner)
+ errors = options._handle_temp_and_staging_locations(validator)
+ self.assertEqual(errors, [])
+ self.assertEqual(
+ options.get_all_options()['staging_location'], "gs://beam/tmp")
+ self.assertEqual(
+ options.get_all_options()['temp_location'], "gs://beam/tmp")
+
+ def test_validation_good_stg_bad_temp(self):
+ runner = MockRunners.DataflowRunner()
+ options = GoogleCloudOptions([
+ '--project=myproject',
+ '--staging_location=gs://beam/stg',
+ '--temp_location=badGSpath'
+ ])
+ validator = PipelineOptionsValidator(options, runner)
+ errors = options._handle_temp_and_staging_locations(validator)
+ self.assertEqual(errors, [])
+ self.assertEqual(
+ options.get_all_options()['staging_location'], "gs://beam/stg")
+ self.assertEqual(
+ options.get_all_options()['temp_location'], "gs://beam/stg")
+
+ def test_validation_bad_stg_bad_temp_with_default(self):
+ runner = MockRunners.DataflowRunner()
+ options = MockGoogleCloudOptionsWithBucket([
+ '--project=myproject',
+ '--staging_location=badGSpath',
+ '--temp_location=badGSpath'
+ ])
+ validator = PipelineOptionsValidator(options, runner)
+ errors = options._handle_temp_and_staging_locations(validator)
+ self.assertEqual(errors, [])
+ self.assertEqual(
+ options.get_all_options()['staging_location'], "gs://default/bucket")
+ self.assertEqual(
+ options.get_all_options()['temp_location'], "gs://default/bucket")
+
+ def test_validation_bad_stg_bad_temp_no_default(self):
+ runner = MockRunners.DataflowRunner()
+ options = MockGoogleCloudOptionsNoBucket([
+ '--project=myproject',
+ '--staging_location=badGSpath',
+ '--temp_location=badGSpath'
+ ])
+ validator = PipelineOptionsValidator(options, runner)
+ errors = options._handle_temp_and_staging_locations(validator)
+ self.assertEqual(len(errors), 2, errors)
+ self.assertIn(
+ 'Invalid GCS path (badGSpath), given for the option: temp_location.',
+ errors,
+ errors)
+ self.assertIn(
+ 'Invalid GCS path (badGSpath), given for the option:
staging_location.',
+ errors,
+ errors)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py
b/sdks/python/apache_beam/options/pipeline_options_validator.py
index 3f489c0b60c..640569b6293 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator.py
@@ -23,6 +23,8 @@ For internal use only; no backwards-compatibility guarantees.
import logging
import re
+import string
+from urllib.parse import urlparse
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import DebugOptions
@@ -92,6 +94,8 @@ class PipelineOptionsValidator(object):
ERR_INVALID_PROJECT_ID = (
'Invalid Project ID (%s). Please make sure you specified the Project ID,
'
'not project description.')
+ ERR_INVALID_ENDPOINT = (
+ 'Invalid url (%s) for dataflow endpoint. Please provide a valid url.')
ERR_INVALID_NOT_POSITIVE = (
'Invalid value (%s) for option: %s. Value needs '
'to be positive.')
@@ -125,7 +129,6 @@ class PipelineOptionsValidator(object):
JOB_PATTERN = '[a-z]([-a-z0-9]*[a-z0-9])?'
PROJECT_ID_PATTERN = '[a-z][-a-z0-9:.]+[a-z0-9]'
PROJECT_NUMBER_PATTERN = '[0-9]*'
- ENDPOINT_PATTERN = r'https://[\S]*googleapis\.com[/]?'
def __init__(self, options, runner):
self.options = options
@@ -154,10 +157,7 @@ class PipelineOptionsValidator(object):
dataflow_endpoint = (
self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
- is_service_endpoint = (
- dataflow_endpoint is not None and
- self.is_full_string_match(self.ENDPOINT_PATTERN, dataflow_endpoint))
-
+ is_service_endpoint = (dataflow_endpoint is not None)
return is_service_runner and is_service_endpoint
def is_full_string_match(self, pattern, string):
@@ -189,7 +189,6 @@ class PipelineOptionsValidator(object):
return self._validate_error(self.ERR_INVALID_GCS_BUCKET, arg, arg_name)
if gcs_object is None or '\n' in gcs_object or '\r' in gcs_object:
return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name)
-
return []
def validate_cloud_options(self, view):
@@ -231,6 +230,15 @@ class PipelineOptionsValidator(object):
errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'region'))
else:
view.region = default_region
+ dataflow_endpoint = view.dataflow_endpoint
+ if dataflow_endpoint is None:
+ errors.extend(
+ self._validate_error(self.ERR_MISSING_OPTION, dataflow_endpoint))
+ else:
+ valid_endpoint = self.validate_endpoint_url(dataflow_endpoint)
+ if valid_endpoint is False:
+ errors.extend(
+ self._validate_error(self.ERR_INVALID_ENDPOINT, dataflow_endpoint))
return errors
def validate_sdk_container_image_options(self, view):
@@ -291,7 +299,7 @@ class PipelineOptionsValidator(object):
self._validate_error(
'Cannot use deprecated flag --zone along with worker_region or '
'worker_zone.'))
- if self.options.view_as(DebugOptions).lookup_experiment('worker_region')\
+ if self.options.view_as(DebugOptions).lookup_experiment('worker_region') \
and (view.worker_region or view.worker_zone):
errors.extend(
self._validate_error(
@@ -393,3 +401,16 @@ class PipelineOptionsValidator(object):
return self._validate_error(
self.ERR_REPEATABLE_OPTIONS_NOT_SET_AS_LIST, arg, arg_name)
return []
+
+ # Minimally validates the endpoint url. This is not a strict application
+ # of http://www.faqs.org/rfcs/rfc1738.html.
+ def validate_endpoint_url(self, endpoint_url):
+ url_parts = urlparse(endpoint_url, allow_fragments=False)
+ if not url_parts.scheme or not url_parts.netloc:
+ return False
+ if url_parts.scheme not in ['http', 'https']:
+ return False
+ if set(
+ url_parts.netloc) <= set(string.ascii_letters + string.digits + '-.'):
+ return True
+ return False
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index ce37688ef82..653ea112c8c 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -110,69 +110,65 @@ class SetupTest(unittest.TestCase):
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
- test_cases = [
- {
- 'temp_location': None,
- 'staging_location': 'gs://foo/bar',
- 'errors': ['temp_location']
- },
- {
- 'temp_location': None,
- 'staging_location': None,
- 'errors': ['staging_location', 'temp_location']
- },
- {
- 'temp_location': 'gs://foo/bar',
- 'staging_location': None,
- 'errors': []
- },
- {
- 'temp_location': 'gs://foo/bar',
- 'staging_location': 'gs://ABC/bar',
- 'errors': ['staging_location']
- },
- {
- 'temp_location': 'gcs:/foo/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': ['temp_location']
- },
- {
- 'temp_location': 'gs:/foo/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': ['temp_location']
- },
- {
- 'temp_location': 'gs://ABC/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': ['temp_location']
- },
- {
- 'temp_location': 'gs://ABC/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': ['temp_location']
- },
- {
- 'temp_location': 'gs://foo',
- 'staging_location': 'gs://foo/bar',
- 'errors': ['temp_location']
- },
- {
- 'temp_location': 'gs://foo/',
- 'staging_location': 'gs://foo/bar',
- 'errors': []
- },
- {
- 'temp_location': 'gs://foo/bar',
- 'staging_location': 'gs://foo/bar',
- 'errors': []
- },
- ]
+ test_cases = [{
+ 'temp_location': None, 'staging_location': 'gs://foo/bar', 'errors': []
+ },
+ {
+ 'temp_location': None,
+ 'staging_location': None,
+ 'errors': ['staging_location', 'temp_location']
+ },
+ {
+ 'temp_location': 'gs://foo/bar',
+ 'staging_location': None,
+ 'errors': []
+ },
+ {
+ 'temp_location': 'gs://foo/bar',
+ 'staging_location': 'gs://ABC/bar',
+ 'errors': []
+ },
+ {
+ 'temp_location': 'gcs:/foo/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []
+ },
+ {
+ 'temp_location': 'gs:/foo/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []
+ },
+ {
+ 'temp_location': 'gs://ABC/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []
+ },
+ {
+ 'temp_location': 'gs://ABC/bar',
+ 'staging_location': 'gs://BCD/bar',
+ 'errors': ['temp_location', 'staging_location']
+ },
+ {
+ 'temp_location': 'gs://foo',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []
+ },
+ {
+ 'temp_location': 'gs://foo/',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []
+ },
+ {
+ 'temp_location': 'gs://foo/bar',
+ 'staging_location': 'gs://foo/bar',
+ 'errors': []
+ }]
for case in test_cases:
errors = get_validator(case['temp_location'],
case['staging_location']).validate()
self.assertEqual(
- self.check_errors_for_arguments(errors, case['errors']), [])
+ self.check_errors_for_arguments(errors, case['errors']), [], case)
def test_project(self):
def get_validator(project):
@@ -318,12 +314,7 @@ class SetupTest(unittest.TestCase):
{
'runner': MockRunners.DataflowRunner(),
'options': ['--dataflow_endpoint=https://another.service.com'],
- 'expected': False,
- },
- {
- 'runner': MockRunners.DataflowRunner(),
- 'options': ['--dataflow_endpoint=https://another.service.com/'],
- 'expected': False,
+ 'expected': True,
},
{
'runner': MockRunners.DataflowRunner(),
@@ -332,7 +323,7 @@ class SetupTest(unittest.TestCase):
},
{
'runner': MockRunners.DataflowRunner(),
- 'options':
['--dataflow_endpoint=https://dataflow.googleapis.com/'],
+ 'options': ['--dataflow_endpoint=foo: //dataflow. googleapis.
com'],
'expected': True,
},
{
@@ -478,20 +469,17 @@ class SetupTest(unittest.TestCase):
self.assertIn('experiment', errors[0])
self.assertIn('worker_region', errors[0])
- def test_experiment_region_and_worker_zone_mutually_exclusive(self):
+ def test_region_and_worker_zone_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
- '--experiments',
- 'worker_region=us-west1',
- '--worker_zone',
- 'us-east1-b',
+ '--worker_region=us-west1',
+ '--worker_zone=us-east1-b',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
- self.assertIn('experiment', errors[0])
self.assertIn('worker_region', errors[0])
self.assertIn('worker_zone', errors[0])
@@ -556,18 +544,31 @@ class SetupTest(unittest.TestCase):
self.assertEqual(options.view_as(WorkerOptions).worker_zone, 'us-east1-b')
def test_region_optional_for_non_service_runner(self):
- runner = MockRunners.DataflowRunner()
+ runner = MockRunners.OtherRunner()
# Remove default region for this test.
runner.get_default_gcp_region = lambda: None
options = PipelineOptions([
'--project=example:example',
'--temp_location=gs://foo/bar',
- '--dataflow_endpoint=http://localhost:20281',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
+ def test_dataflow_endpoint_is_a_url(self):
+ runner = MockRunners.DataflowRunner()
+ # Remove default region for this test.
+ options = PipelineOptions([
+ '--project=example:example',
+ '--temp_location=gs://foo/bar',
+ '--staging_location=gs://foo/baz',
+ '--dataflow_endpoint=foo and bar'
+ ])
+ validator = PipelineOptionsValidator(options, runner)
+ errors = validator.validate()
+ self.assertEqual(len(errors), 1, errors)
+ self.assertIn("Invalid url (foo and bar)", errors[0])
+
def test_alias_sdk_container_to_worker_harness(self):
runner = MockRunners.DataflowRunner()
test_image = "SDK_IMAGE"
@@ -924,7 +925,7 @@ class SetupTest(unittest.TestCase):
errors.append(
'Options "%s" had unexpected validation results: "%s"' %
(' '.join(case['options']), ' '.join(validation_errors)))
- self.assertEqual(errors, [])
+ self.assertEqual(errors, [], errors)
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 1e084b98278..b58531acc6a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -87,11 +87,11 @@ class SpecialDoFn(beam.DoFn):
class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
def setUp(self):
self.default_properties = [
- '--dataflow_endpoint=ignored',
'--job_name=test-job',
'--project=test-project',
- '--staging_location=ignored',
- '--temp_location=/dev/null',
+ '--region=us-central1'
+ '--staging_location=gs://beam/test',
+ '--temp_location=gs://beam/tmp',
'--no_auth',
'--dry_run=True',
'--sdk_location=container'
diff --git a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
index 021056608f1..792c5cfd165 100644
--- a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
@@ -51,17 +51,16 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
dummy_dir = tempfile.mkdtemp()
remote_runner = DataflowRunner()
- with Pipeline(remote_runner,
- options=PipelineOptions(['--dataflow_endpoint=ignored',
- '--sdk_location=' + dummy_file_name,
- '--job_name=test-job',
- '--project=test-project',
- '--staging_location=' + dummy_dir,
- '--temp_location=/dev/null',
- '--template_location=' +
- dummy_file_name,
- '--no_auth'])) as pipeline:
-
+ options = PipelineOptions([
+ '--sdk_location=' + dummy_file_name,
+ '--job_name=test-job',
+ '--project=apache-beam-testing',
+ '--region=us-central1',
+ '--staging_location=gs://apache-beam-testing-stg/stg/',
+ '--temp_location=gs://apache-beam-testing-temp/tmp',
+ '--template_location=' + dummy_file_name
+ ])
+ with Pipeline(remote_runner, options) as pipeline:
pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint:
disable=expression-not-assigned
with open(dummy_file_name) as template_file:
@@ -69,7 +68,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
self.assertEqual(
saved_job_dict['environment']['sdkPipelineOptions']['options']
['project'],
- 'test-project')
+ 'apache-beam-testing')
self.assertEqual(
saved_job_dict['environment']['sdkPipelineOptions']['options']
['job_name'],
diff --git a/sdks/python/apache_beam/transforms/external_test.py
b/sdks/python/apache_beam/transforms/external_test.py
index d3bb02a7f94..fd5c81de659 100644
--- a/sdks/python/apache_beam/transforms/external_test.py
+++ b/sdks/python/apache_beam/transforms/external_test.py
@@ -202,16 +202,15 @@ class ExternalTransformTest(unittest.TestCase):
@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
def test_pipeline_generation_with_runner_overrides(self):
pipeline_properties = [
- '--dataflow_endpoint=ignored',
'--job_name=test-job',
'--project=test-project',
- '--staging_location=ignored',
- '--temp_location=/dev/null',
+ '--temp_location=gs://beam/tmp',
'--no_auth',
'--dry_run=True',
'--sdk_location=container',
'--runner=DataflowRunner',
- '--streaming'
+ '--streaming',
+ '--region=us-central1'
]
with beam.Pipeline(options=PipelineOptions(pipeline_properties)) as p: