This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 81fe195 [BEAM-12751] Set clientRequestId for Dataflow python job
creation
new cbbebcd Merge pull request #15335 from [BEAM-12751] Set
clientRequestId for Dataflow python job creation
81fe195 is described below
commit 81fe195239cdc399e86950c4c3b67559acda555d
Author: Minbo Bae <[email protected]>
AuthorDate: Sun Aug 15 20:35:33 2021 -0700
[BEAM-12751] Set clientRequestId for Dataflow python job creation
---
.../runners/dataflow/internal/apiclient.py | 28 ++++++++
.../runners/dataflow/internal/apiclient_test.py | 80 ++++++++++++++++++++++
2 files changed, 108 insertions(+)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 9dc75cf..209d131 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -27,6 +27,8 @@ import io
import json
import logging
import os
+import random
+
import pkg_resources
import re
import sys
@@ -496,6 +498,11 @@ class Job(object):
self.proto.labels.additionalProperties.append(
dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))
+ # Client Request ID
+ self.proto.clientRequestId = '{}-{}'.format(
+ datetime.utcnow().strftime('%Y%m%d%H%M%S%f'),
+ random.randrange(9000) + 1000)
+
self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
@@ -794,6 +801,20 @@ class DataflowApplicationClient(object):
self.google_cloud_options.dataflow_endpoint)
_LOGGER.fatal('details of server error: %s', e)
raise
+
+ if response.clientRequestId and \
+ response.clientRequestId != job.proto.clientRequestId:
+ if self.google_cloud_options.update:
+ raise DataflowJobAlreadyExistsError(
+ "The job named %s with id: %s has already been updated into job "
+ "id: %s and cannot be updated again." %
+ (response.name, job.proto.replaceJobId, response.id))
+ else:
+ raise DataflowJobAlreadyExistsError(
+ 'There is already active job named %s with id: %s. If you want to '
+ 'submit a second job, try again by setting a different name using '
+ '--job_name.' % (response.name, response.id))
+
_LOGGER.info('Create job: %s', response)
# The response is a Job proto with the id for the new job.
_LOGGER.info('Created job with id: [%s]', response.id)
@@ -1028,6 +1049,13 @@ class _LegacyDataflowStager(Stager):
return shared_names.BEAM_PACKAGE_NAME
+class DataflowJobAlreadyExistsError(retry.PermanentException):
+ """A non-retryable exception that a job with the given name already
exists."""
+ # Inherits retry.PermanentException to avoid retry in
+ # DataflowApplicationClient.submit_job_description
+ pass
+
+
def to_split_int(n):
res = dataflow.SplitInt64()
res.lowBits = n & 0xffffffff
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index a4f81a3..41e79f3 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -1039,6 +1039,86 @@ class UtilTest(unittest.TestCase):
mock.ANY, "dataflow_graph.json", mock.ANY)
client.create_job_description.assert_called_once()
+ def test_create_job_returns_existing_job(self):
+ pipeline_options = PipelineOptions([
+ '--project',
+ 'test_project',
+ '--job_name',
+ 'test_job_name',
+ '--temp_location',
+ 'gs://test-location/temp',
+ ])
+ job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
+ self.assertTrue(job.proto.clientRequestId) # asserts non-empty string
+ pipeline_options.view_as(GoogleCloudOptions).no_auth = True
+ client = apiclient.DataflowApplicationClient(pipeline_options)
+
+ response = dataflow.Job()
+ # different clientRequestId from `job`
+ response.clientRequestId = "20210821081910123456-1234"
+ response.name = 'test_job_name'
+ response.id = '2021-08-19_21_18_43-9756917246311111021'
+
+ with mock.patch.object(client._client.projects_locations_jobs,
+ 'Create',
+ side_effect=[response]):
+ with mock.patch.object(client, 'create_job_description',
+ side_effect=None):
+ with self.assertRaises(
+ apiclient.DataflowJobAlreadyExistsError) as context:
+ client.create_job(job)
+
+ self.assertEqual(
+ str(context.exception),
+ 'There is already active job named %s with id: %s. If you want to '
+ 'submit a second job, try again by setting a different name using '
+ '--job_name.' % ('test_job_name', response.id))
+
+ def test_update_job_returns_existing_job(self):
+ pipeline_options = PipelineOptions([
+ '--project',
+ 'test_project',
+ '--job_name',
+ 'test_job_name',
+ '--temp_location',
+ 'gs://test-location/temp',
+ '--region',
+ 'us-central1',
+ '--update',
+ ])
+ replace_job_id = '2021-08-21_00_00_01-6081497447916622336'
+ with mock.patch('apache_beam.runners.dataflow.internal.apiclient'
+ '.DataflowApplicationClient.job_id_for_name',
+ return_value=replace_job_id) as job_id_for_name_mock:
+ job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
+ job_id_for_name_mock.assert_called_once()
+
+ self.assertTrue(job.proto.clientRequestId) # asserts non-empty string
+
+ pipeline_options.view_as(GoogleCloudOptions).no_auth = True
+ client = apiclient.DataflowApplicationClient(pipeline_options)
+
+ response = dataflow.Job()
+ # different clientRequestId from `job`
+ response.clientRequestId = "20210821083254123456-1234"
+ response.name = 'test_job_name'
+ response.id = '2021-08-19_21_29_07-5725551945600207770'
+
+ with mock.patch.object(client, 'create_job_description', side_effect=None):
+ with mock.patch.object(client._client.projects_locations_jobs,
+ 'Create',
+ side_effect=[response]):
+
+ with self.assertRaises(
+ apiclient.DataflowJobAlreadyExistsError) as context:
+ client.create_job(job)
+
+ self.assertEqual(
+ str(context.exception),
+ 'The job named %s with id: %s has already been updated into job '
+ 'id: %s and cannot be updated again.' %
+ ('test_job_name', replace_job_id, response.id))
+
def test_template_file_generation_with_upload_graph(self):
pipeline_options = PipelineOptions([
'--project',