This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b2d1c60dc77 is_service_runner now returns false with
dataflow_endpoint=localhost (#28128)
b2d1c60dc77 is described below
commit b2d1c60dc7710213f21fd4bf3fda0c5d375fb759
Author: Kerry Donny-Clark <[email protected]>
AuthorDate: Wed Aug 23 16:46:22 2023 -0400
is_service_runner now returns false with dataflow_endpoint=localhost
(#28128)
---
.../python/apache_beam/options/pipeline_options_validator.py | 10 ++++++++--
.../apache_beam/options/pipeline_options_validator_test.py | 12 +++++++++++-
2 files changed, 19 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py
b/sdks/python/apache_beam/options/pipeline_options_validator.py
index 640569b6293..7c07e5a1e6c 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator.py
@@ -157,8 +157,13 @@ class PipelineOptionsValidator(object):
dataflow_endpoint = (
self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
- is_service_endpoint = (dataflow_endpoint is not None)
- return is_service_runner and is_service_endpoint
+ if dataflow_endpoint is None:
+ return False
+ else:
+ endpoint_parts = urlparse(dataflow_endpoint, allow_fragments=False)
+ if endpoint_parts.netloc.startswith("localhost"):
+ return False
+ return is_service_runner
def is_full_string_match(self, pattern, string):
"""Returns True if the pattern matches the whole string."""
@@ -404,6 +409,7 @@ class PipelineOptionsValidator(object):
# Minimally validates the endpoint url. This is not a strict application
# of http://www.faqs.org/rfcs/rfc1738.html.
+ # If the url matches localhost, set
def validate_endpoint_url(self, endpoint_url):
url_parts = urlparse(endpoint_url, allow_fragments=False)
if not url_parts.scheme or not url_parts.netloc:
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index 653ea112c8c..1f4c8be226d 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -311,6 +311,11 @@ class SetupTest(unittest.TestCase):
'options':
['--dataflow_endpoint=https://dataflow.googleapis.com/'],
'expected': False,
},
+ {
+ 'runner': MockRunners.DataflowRunner(),
+ 'options': [],
+ 'expected': True,
+ },
{
'runner': MockRunners.DataflowRunner(),
'options': ['--dataflow_endpoint=https://another.service.com'],
@@ -321,6 +326,11 @@ class SetupTest(unittest.TestCase):
'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
'expected': True,
},
+ {
+ 'runner': MockRunners.DataflowRunner(),
+ 'options': ['--dataflow_endpoint=http://localhost:1000'],
+ 'expected': False,
+ },
{
'runner': MockRunners.DataflowRunner(),
'options': ['--dataflow_endpoint=foo: //dataflow. googleapis.
com'],
@@ -336,7 +346,7 @@ class SetupTest(unittest.TestCase):
for case in test_cases:
validator = PipelineOptionsValidator(
PipelineOptions(case['options']), case['runner'])
- self.assertEqual(validator.is_service_runner(), case['expected'])
+ self.assertEqual(validator.is_service_runner(), case['expected'], case)
def test_dataflow_job_file_and_template_location_mutually_exclusive(self):
runner = MockRunners.OtherRunner()