http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py b/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py deleted file mode 100644 index 4dda47a..0000000 --- a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers.py +++ /dev/null @@ -1,124 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from hamcrest.core.base_matcher import BaseMatcher - - -IGNORED = object() - - -class MetricStructuredNameMatcher(BaseMatcher): - """Matches a MetricStructuredName.""" - def __init__(self, - name=IGNORED, - origin=IGNORED, - context=IGNORED): - """Creates a MetricsStructuredNameMatcher. - - Any property not passed in to the constructor will be ignored when matching. - - Args: - name: A string with the metric name. - origin: A string with the metric namespace. - context: A key:value dictionary that will be matched to the - structured name. - """ - if context != IGNORED and not isinstance(context, dict): - raise ValueError('context must be a Python dictionary.') - - self.name = name - self.origin = origin - self.context = context - - def _matches(self, item): - if self.name != IGNORED and item.name != self.name: - return False - if self.origin != IGNORED and item.origin != self.origin: - return False - if self.context != IGNORED: - for key, name in self.context.iteritems(): - if key not in item.context: - return False - if name != IGNORED and item.context[key] != name: - return False - return True - - def describe_to(self, description): - descriptors = [] - if self.name != IGNORED: - descriptors.append('name is {}'.format(self.name)) - if self.origin != IGNORED: - descriptors.append('origin is {}'.format(self.origin)) - if self.context != IGNORED: - descriptors.append('context is ({})'.format(str(self.context))) - - item_description = ' and '.join(descriptors) - description.append(item_description) - - -class MetricUpdateMatcher(BaseMatcher): - """Matches a metrics update protocol buffer.""" - def __init__(self, - cumulative=IGNORED, - name=IGNORED, - scalar=IGNORED, - kind=IGNORED): - """Creates a MetricUpdateMatcher. - - Any property not passed in to the constructor will be ignored when matching. - - Args: - cumulative: A boolean. - name: A MetricStructuredNameMatcher object that matches the name. - scalar: An integer with the metric update. - kind: A string defining the kind of counter. - """ - if name != IGNORED and not isinstance(name, MetricStructuredNameMatcher): - raise ValueError('name must be a MetricStructuredNameMatcher.') - - self.cumulative = cumulative - self.name = name - self.scalar = scalar - self.kind = kind - - def _matches(self, item): - if self.cumulative != IGNORED and item.cumulative != self.cumulative: - return False - if self.name != IGNORED and not self.name._matches(item.name): - return False - if self.kind != IGNORED and item.kind != self.kind: - return False - if self.scalar != IGNORED: - value_property = [p - for p in item.scalar.object_value.properties - if p.key == 'value'] - int_value = value_property[0].value.integer_value - if self.scalar != int_value: - return False - return True - - def describe_to(self, description): - descriptors = [] - if self.cumulative != IGNORED: - descriptors.append('cumulative is {}'.format(self.cumulative)) - if self.name != IGNORED: - descriptors.append('name is {}'.format(self.name)) - if self.scalar != IGNORED: - descriptors.append('scalar is ({})'.format(str(self.scalar))) - - item_description = ' and '.join(descriptors) - description.append(item_description)
http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py b/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py deleted file mode 100644 index ec63ce7..0000000 --- a/sdks/python/apache_beam/internal/clients/dataflow/message_matchers_test.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import unittest - -import hamcrest as hc - -import apache_beam.internal.clients.dataflow as dataflow -from apache_beam.internal.clients.dataflow import message_matchers -from apache_beam.internal.json_value import to_json_value - - -class TestMatchers(unittest.TestCase): - - def test_structured_name_matcher_basic(self): - metric_name = dataflow.MetricStructuredName() - metric_name.name = 'metric1' - metric_name.origin = 'origin2' - - matcher = message_matchers.MetricStructuredNameMatcher( - name='metric1', - origin='origin2') - hc.assert_that(metric_name, hc.is_(matcher)) - with self.assertRaises(AssertionError): - matcher = message_matchers.MetricStructuredNameMatcher( - name='metric1', - origin='origin1') - hc.assert_that(metric_name, hc.is_(matcher)) - - def test_metric_update_basic(self): - metric_update = dataflow.MetricUpdate() - metric_update.name = dataflow.MetricStructuredName() - metric_update.name.name = 'metric1' - metric_update.name.origin = 'origin1' - - metric_update.cumulative = False - metric_update.kind = 'sum' - metric_update.scalar = to_json_value(1, with_type=True) - - name_matcher = message_matchers.MetricStructuredNameMatcher( - name='metric1', - origin='origin1') - matcher = message_matchers.MetricUpdateMatcher( - name=name_matcher, - kind='sum', - scalar=1) - - hc.assert_that(metric_update, hc.is_(matcher)) - - with self.assertRaises(AssertionError): - matcher.kind = 'suma' - hc.assert_that(metric_update, hc.is_(matcher)) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py index 1507d88..760935d 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner.py @@ -33,17 +33,17 @@ from apache_beam.internal import json_value from apache_beam.internal import pickler from apache_beam.pvalue import PCollectionView from apache_beam.runners.google_cloud_dataflow.dataflow_metrics import DataflowMetrics +from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow as dataflow_api +from apache_beam.runners.runner import PValueCache from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState -from apache_beam.runners.runner import PValueCache from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.utils import names from apache_beam.utils.names import PropertyNames from apache_beam.utils.names import TransformNames from apache_beam.utils.pipeline_options import StandardOptions -from apache_beam.internal.clients import dataflow as dataflow_api class DataflowRunner(PipelineRunner): @@ -154,7 +154,7 @@ class DataflowRunner(PipelineRunner): """Remotely executes entire pipeline or parts reachable from node.""" # Import here to avoid adding the dependency for local running scenarios. # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.internal import apiclient + from apache_beam.runners.google_cloud_dataflow.internal import apiclient self.job = apiclient.Job(pipeline.options) # The superclass's run will trigger a traversal of all reachable nodes. @@ -235,7 +235,7 @@ class DataflowRunner(PipelineRunner): """Creates a Step object and adds it to the cache.""" # Import here to avoid adding the dependency for local running scenarios. # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.internal import apiclient + from apache_beam.runners.google_cloud_dataflow.internal import apiclient step = apiclient.Step(step_kind, self._get_unique_step_name()) self.job.proto.steps.append(step.proto) step.add_property(PropertyNames.USER_NAME, step_label) http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py index e9b5aff..59bd2fc 100644 --- a/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/dataflow_runner_test.py @@ -21,9 +21,9 @@ import unittest import mock -from apache_beam.internal.clients import dataflow as dataflow_api from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowPipelineResult from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRuntimeException +from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow as dataflow_api class DataflowRunnerTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/__init__.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py new file mode 100644 index 0000000..db761ea --- /dev/null +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient.py @@ -0,0 +1,726 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Dataflow client utility functions.""" + +import codecs +import getpass +import json +import logging +import os +import re +import time +from StringIO import StringIO +from datetime import datetime + +from apitools.base.py import encoding +from apitools.base.py import exceptions + +from apache_beam import utils +from apache_beam.internal.auth import get_service_credentials +from apache_beam.internal.clients import storage +from apache_beam.internal.json_value import to_json_value +from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow +from apache_beam.transforms import cy_combiners +from apache_beam.transforms.display import DisplayData +from apache_beam.utils import dependency +from apache_beam.utils import retry +from apache_beam.utils.dependency import get_required_container_version +from apache_beam.utils.dependency import get_sdk_name_and_version +from apache_beam.utils.names import PropertyNames +from apache_beam.utils.pipeline_options import DebugOptions +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import StandardOptions +from apache_beam.utils.pipeline_options import WorkerOptions + + +class Step(object): + """Wrapper for a dataflow Step protobuf.""" + + def __init__(self, step_kind, step_name, additional_properties=None): + self.step_kind = step_kind + self.step_name = step_name + self.proto = dataflow.Step(kind=step_kind, name=step_name) + self.proto.properties = {} + self._additional_properties = [] + + if additional_properties is not None: + for (n, v, t) in additional_properties: + self.add_property(n, v, t) + + def add_property(self, name, value, with_type=False): + self._additional_properties.append((name, value, with_type)) + self.proto.properties.additionalProperties.append( + dataflow.Step.PropertiesValue.AdditionalProperty( + key=name, value=to_json_value(value, with_type=with_type))) + + def _get_outputs(self): + """Returns a list of all output labels for a step.""" + outputs = [] + for p in self.proto.properties.additionalProperties: + if p.key == PropertyNames.OUTPUT_INFO: + for entry in p.value.array_value.entries: + for entry_prop in entry.object_value.properties: + if entry_prop.key == PropertyNames.OUTPUT_NAME: + outputs.append(entry_prop.value.string_value) + return outputs + + def __reduce__(self): + """Reduce hook for pickling the Step class more easily.""" + return (Step, (self.step_kind, self.step_name, self._additional_properties)) + + def get_output(self, tag=None): + """Returns name if it is one of the outputs or first output if name is None. + + Args: + tag: tag of the output as a string or None if we want to get the + name of the first output. + + Returns: + The name of the output associated with the tag or the first output + if tag was None. + + Raises: + ValueError: if the tag does not exist within outputs. + """ + outputs = self._get_outputs() + if tag is None: + return outputs[0] + else: + name = '%s_%s' % (PropertyNames.OUT, tag) + if name not in outputs: + raise ValueError( + 'Cannot find named output: %s in %s.' % (name, outputs)) + return name + + +class Environment(object): + """Wrapper for a dataflow Environment protobuf.""" + + def __init__(self, packages, options, environment_version): + self.standard_options = options.view_as(StandardOptions) + self.google_cloud_options = options.view_as(GoogleCloudOptions) + self.worker_options = options.view_as(WorkerOptions) + self.debug_options = options.view_as(DebugOptions) + self.proto = dataflow.Environment() + self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE + self.proto.dataset = '{}/cloud_dataflow'.format( + GoogleCloudOptions.BIGQUERY_API_SERVICE) + self.proto.tempStoragePrefix = ( + self.google_cloud_options.temp_location.replace( + 'gs:/', + GoogleCloudOptions.STORAGE_API_SERVICE)) + # User agent information. + self.proto.userAgent = dataflow.Environment.UserAgentValue() + self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint + + if self.google_cloud_options.service_account_email: + self.proto.serviceAccountEmail = ( + self.google_cloud_options.service_account_email) + + sdk_name, version_string = get_sdk_name_and_version() + + self.proto.userAgent.additionalProperties.extend([ + dataflow.Environment.UserAgentValue.AdditionalProperty( + key='name', + value=to_json_value(sdk_name)), + dataflow.Environment.UserAgentValue.AdditionalProperty( + key='version', value=to_json_value(version_string))]) + # Version information. + self.proto.version = dataflow.Environment.VersionValue() + if self.standard_options.streaming: + job_type = 'PYTHON_STREAMING' + else: + job_type = 'PYTHON_BATCH' + self.proto.version.additionalProperties.extend([ + dataflow.Environment.VersionValue.AdditionalProperty( + key='job_type', + value=to_json_value(job_type)), + dataflow.Environment.VersionValue.AdditionalProperty( + key='major', value=to_json_value(environment_version))]) + # Experiments + if self.debug_options.experiments: + for experiment in self.debug_options.experiments: + self.proto.experiments.append(experiment) + # Worker pool(s) information. + package_descriptors = [] + for package in packages: + package_descriptors.append( + dataflow.Package( + location='%s/%s' % ( + self.google_cloud_options.staging_location.replace( + 'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE), + package), + name=package)) + + pool = dataflow.WorkerPool( + kind='local' if self.local else 'harness', + packages=package_descriptors, + taskrunnerSettings=dataflow.TaskRunnerSettings( + parallelWorkerSettings=dataflow.WorkerSettings( + baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT, + servicePath=self.google_cloud_options.dataflow_endpoint))) + pool.autoscalingSettings = dataflow.AutoscalingSettings() + # Set worker pool options received through command line. + if self.worker_options.num_workers: + pool.numWorkers = self.worker_options.num_workers + if self.worker_options.max_num_workers: + pool.autoscalingSettings.maxNumWorkers = ( + self.worker_options.max_num_workers) + if self.worker_options.autoscaling_algorithm: + values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum + pool.autoscalingSettings.algorithm = { + 'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE, + 'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC, + }.get(self.worker_options.autoscaling_algorithm) + if self.worker_options.machine_type: + pool.machineType = self.worker_options.machine_type + if self.worker_options.disk_size_gb: + pool.diskSizeGb = self.worker_options.disk_size_gb + if self.worker_options.disk_type: + pool.diskType = self.worker_options.disk_type + if self.worker_options.zone: + pool.zone = self.worker_options.zone + if self.worker_options.network: + pool.network = self.worker_options.network + if self.worker_options.worker_harness_container_image: + pool.workerHarnessContainerImage = ( + self.worker_options.worker_harness_container_image) + else: + # Default to using the worker harness container image for the current SDK + # version. + pool.workerHarnessContainerImage = ( + 'dataflow.gcr.io/v1beta3/python:%s' % + get_required_container_version()) + if self.worker_options.use_public_ips is not None: + if self.worker_options.use_public_ips: + pool.ipConfiguration = ( + dataflow.WorkerPool + .IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC) + else: + pool.ipConfiguration = ( + dataflow.WorkerPool + .IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE) + + if self.standard_options.streaming: + # Use separate data disk for streaming. + disk = dataflow.Disk() + if self.local: + disk.diskType = 'local' + # TODO(ccy): allow customization of disk. + pool.dataDisks.append(disk) + self.proto.workerPools.append(pool) + + sdk_pipeline_options = options.get_all_options() + if sdk_pipeline_options: + self.proto.sdkPipelineOptions = ( + dataflow.Environment.SdkPipelineOptionsValue()) + + options_dict = {k: v + for k, v in sdk_pipeline_options.iteritems() + if v is not None} + self.proto.sdkPipelineOptions.additionalProperties.append( + dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( + key='options', value=to_json_value(options_dict))) + + dd = DisplayData.create_from_options(options) + items = [item.get_dict() for item in dd.items] + self.proto.sdkPipelineOptions.additionalProperties.append( + dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( + key='display_data', value=to_json_value(items))) + + +class Job(object): + """Wrapper for a dataflow Job protobuf.""" + + def __str__(self): + def encode_shortstrings(input_buffer, errors='strict'): + """Encoder (from Unicode) that suppresses long base64 strings.""" + original_len = len(input_buffer) + if original_len > 150: + if self.base64_str_re.match(input_buffer): + input_buffer = '<string of %d bytes>' % original_len + input_buffer = input_buffer.encode('ascii', errors=errors) + else: + matched = self.coder_str_re.match(input_buffer) + if matched: + input_buffer = '%s<string of %d bytes>' % ( + matched.group(1), matched.end(2) - matched.start(2)) + input_buffer = input_buffer.encode('ascii', errors=errors) + return input_buffer, original_len + + def decode_shortstrings(input_buffer, errors='strict'): + """Decoder (to Unicode) that suppresses long base64 strings.""" + shortened, length = encode_shortstrings(input_buffer, errors) + return unicode(shortened), length + + def shortstrings_registerer(encoding_name): + if encoding_name == 'shortstrings': + return codecs.CodecInfo(name='shortstrings', + encode=encode_shortstrings, + decode=decode_shortstrings) + return None + + codecs.register(shortstrings_registerer) + + # Use json "dump string" method to get readable formatting; + # further modify it to not output too-long strings, aimed at the + # 10,000+ character hex-encoded "serialized_fn" values. + return json.dumps( + json.loads(encoding.MessageToJson(self.proto), encoding='shortstrings'), + indent=2, sort_keys=True) + + @staticmethod + def default_job_name(job_name): + if job_name is None: + user_name = getpass.getuser().lower() + date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f') + app_name = 'beamapp' + job_name = '{}-{}-{}'.format(app_name, user_name, date_component) + return job_name + + def __init__(self, options): + self.options = options + self.google_cloud_options = options.view_as(GoogleCloudOptions) + if not self.google_cloud_options.job_name: + self.google_cloud_options.job_name = self.default_job_name( + self.google_cloud_options.job_name) + + required_google_cloud_options = ['project', 'job_name', 'temp_location'] + missing = [ + option for option in required_google_cloud_options + if not getattr(self.google_cloud_options, option)] + if missing: + raise ValueError( + 'Missing required configuration parameters: %s' % missing) + + if not self.google_cloud_options.staging_location: + logging.info('Defaulting to the temp_location as staging_location: %s', + self.google_cloud_options.temp_location) + (self.google_cloud_options + .staging_location) = self.google_cloud_options.temp_location + + # Make the staging and temp locations job name and time specific. This is + # needed to avoid clashes between job submissions using the same staging + # area or team members using same job names. This method is not entirely + # foolproof since two job submissions with same name can happen at exactly + # the same time. However the window is extremely small given that + # time.time() has at least microseconds granularity. We add the suffix only + # for GCS staging locations where the potential for such clashes is high. + if self.google_cloud_options.staging_location.startswith('gs://'): + path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time()) + self.google_cloud_options.staging_location = utils.path.join( + self.google_cloud_options.staging_location, path_suffix) + self.google_cloud_options.temp_location = utils.path.join( + self.google_cloud_options.temp_location, path_suffix) + self.proto = dataflow.Job(name=self.google_cloud_options.job_name) + if self.options.view_as(StandardOptions).streaming: + self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING + else: + self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH + self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') + self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$') + + def json(self): + return encoding.MessageToJson(self.proto) + + def __reduce__(self): + """Reduce hook for pickling the Job class more easily.""" + return (Job, (self.options,)) + + +class DataflowApplicationClient(object): + """A Dataflow API client used by application code to create and query jobs.""" + + def __init__(self, options, environment_version): + """Initializes a Dataflow API client object.""" + self.standard_options = options.view_as(StandardOptions) + self.google_cloud_options = options.view_as(GoogleCloudOptions) + self.environment_version = environment_version + if self.google_cloud_options.no_auth: + credentials = None + else: + credentials = get_service_credentials() + self._client = dataflow.DataflowV1b3( + url=self.google_cloud_options.dataflow_endpoint, + credentials=credentials, + get_credentials=(not self.google_cloud_options.no_auth)) + self._storage_client = storage.StorageV1( + url='https://www.googleapis.com/storage/v1', + credentials=credentials, + get_credentials=(not self.google_cloud_options.no_auth)) + + # TODO(silviuc): Refactor so that retry logic can be applied. + @retry.no_retries # Using no_retries marks this as an integration point. + def _gcs_file_copy(self, from_path, to_path): + to_folder, to_name = os.path.split(to_path) + with open(from_path, 'rb') as f: + self.stage_file(to_folder, to_name, f) + + def stage_file(self, gcs_or_local_path, file_name, stream, + mime_type='application/octet-stream'): + """Stages a file at a GCS or local path with stream-supplied contents.""" + if not gcs_or_local_path.startswith('gs://'): + local_path = os.path.join(gcs_or_local_path, file_name) + logging.info('Staging file locally to %s', local_path) + with open(local_path, 'wb') as f: + f.write(stream.read()) + return + gcs_location = gcs_or_local_path + '/' + file_name + bucket, name = gcs_location[5:].split('/', 1) + + request = storage.StorageObjectsInsertRequest( + bucket=bucket, name=name) + logging.info('Starting GCS upload to %s...', gcs_location) + upload = storage.Upload(stream, mime_type) + try: + response = self._storage_client.objects.Insert(request, upload=upload) + except exceptions.HttpError as e: + reportable_errors = { + 403: 'access denied', + 404: 'bucket not found', + } + if e.status_code in reportable_errors: + raise IOError(('Could not upload to GCS path %s: %s. Please verify ' + 'that credentials are valid and that you have write ' + 'access to the specified path. Stale credentials can be ' + 'refreshed by executing "gcloud auth login".') % + (gcs_or_local_path, reportable_errors[e.status_code])) + raise + logging.info('Completed GCS upload to %s', gcs_location) + return response + + # TODO(silviuc): Refactor so that retry logic can be applied. + @retry.no_retries # Using no_retries marks this as an integration point. + def create_job(self, job): + """Creates job description. May stage and/or submit for remote execution.""" + self.create_job_description(job) + + # Stage and submit the job when necessary + dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file + template_location = ( + job.options.view_as(GoogleCloudOptions).template_location) + + job_location = template_location or dataflow_job_file + if job_location: + gcs_or_local_path = os.path.dirname(job_location) + file_name = os.path.basename(job_location) + self.stage_file(gcs_or_local_path, file_name, StringIO(job.json())) + + if not template_location: + return self.submit_job_description(job) + else: + return None + + def create_job_description(self, job): + """Creates a job described by the workflow proto.""" + resources = dependency.stage_job_resources( + job.options, file_copy=self._gcs_file_copy) + job.proto.environment = Environment( + packages=resources, options=job.options, + environment_version=self.environment_version).proto + # TODO(silviuc): Remove the debug logging eventually. + logging.info('JOB: %s', job) + + def submit_job_description(self, job): + """Creates and excutes a job request.""" + request = dataflow.DataflowProjectsJobsCreateRequest() + request.projectId = self.google_cloud_options.project + request.job = job.proto + + try: + response = self._client.projects_jobs.Create(request) + except exceptions.BadStatusCodeError as e: + logging.error('HTTP status %d trying to create job' + ' at dataflow service endpoint %s', + e.response.status, + self.google_cloud_options.dataflow_endpoint) + logging.fatal('details of server error: %s', e) + raise + logging.info('Create job: %s', response) + # The response is a Job proto with the id for the new job. + logging.info('Created job with id: [%s]', response.id) + logging.info( + 'To access the Dataflow monitoring console, please navigate to ' + 'https://console.developers.google.com/project/%s/dataflow/job/%s', + self.google_cloud_options.project, response.id) + + return response + + @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py + def modify_job_state(self, job_id, new_state): + """Modify the run state of the job. + + Args: + job_id: The id of the job. + new_state: A string representing the new desired state. It could be set to + either 'JOB_STATE_DONE', 'JOB_STATE_CANCELLED' or 'JOB_STATE_DRAINING'. + + Returns: + True if the job was modified successfully. + """ + if new_state == 'JOB_STATE_DONE': + new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DONE + elif new_state == 'JOB_STATE_CANCELLED': + new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_CANCELLED + elif new_state == 'JOB_STATE_DRAINING': + new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DRAINING + else: + # Other states could only be set by the service. + return False + + request = dataflow.DataflowProjectsJobsUpdateRequest() + request.jobId = job_id + request.projectId = self.google_cloud_options.project + request.job = dataflow.Job(requestedState=new_state) + + self._client.projects_jobs.Update(request) + return True + + @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py + def get_job(self, job_id): + """Gets the job status for a submitted job. + + Args: + job_id: A string representing the job_id for the workflow as returned + by the a create_job() request. + + Returns: + A Job proto. See below for interesting fields. + + The Job proto returned from a get_job() request contains some interesting + fields: + currentState: An object representing the current state of the job. The + string representation of the object (str() result) has the following + possible values: JOB_STATE_UNKNONW, JOB_STATE_STOPPED, + JOB_STATE_RUNNING, JOB_STATE_DONE, JOB_STATE_FAILED, + JOB_STATE_CANCELLED. + createTime: UTC time when the job was created + (e.g. '2015-03-10T00:01:53.074Z') + currentStateTime: UTC time for the current state of the job. + """ + request = dataflow.DataflowProjectsJobsGetRequest() + request.jobId = job_id + request.projectId = self.google_cloud_options.project + response = self._client.projects_jobs.Get(request) + return response + + @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py + def list_messages( + self, job_id, start_time=None, end_time=None, page_token=None, + minimum_importance=None): + """List messages associated with the execution of a job. + + Args: + job_id: A string representing the job_id for the workflow as returned + by the a create_job() request. + start_time: If specified, only messages generated after the start time + will be returned, otherwise all messages since job started will be + returned. The value is a string representing UTC time + (e.g., '2015-08-18T21:03:50.644Z') + end_time: If specified, only messages generated before the end time + will be returned, otherwise all messages up to current time will be + returned. The value is a string representing UTC time + (e.g., '2015-08-18T21:03:50.644Z') + page_token: A string to be used as next page token if the list call + returned paginated results. + minimum_importance: Filter for messages based on importance. The possible + string values in increasing order of importance are: JOB_MESSAGE_DEBUG, + JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC, JOB_MESSAGE_WARNING, + JOB_MESSAGE_ERROR. For example, a filter set on warning will allow only + warnings and errors and exclude all others. + + Returns: + A tuple consisting of a list of JobMessage instances and a + next page token string. + + Raises: + RuntimeError: if an unexpected value for the message_importance argument + is used. + + The JobMessage objects returned by the call contain the following fields: + id: A unique string identifier for the message. + time: A string representing the UTC time of the message + (e.g., '2015-08-18T21:03:50.644Z') + messageImportance: An enumeration value for the message importance. The + value if converted to string will have the following possible values: + JOB_MESSAGE_DEBUG, JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC, + JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR. + messageText: A message string. + """ + request = dataflow.DataflowProjectsJobsMessagesListRequest( + jobId=job_id, projectId=self.google_cloud_options.project) + if page_token is not None: + request.pageToken = page_token + if start_time is not None: + request.startTime = start_time + if end_time is not None: + request.endTime = end_time + if minimum_importance is not None: + if minimum_importance == 'JOB_MESSAGE_DEBUG': + request.minimumImportance = ( + dataflow.DataflowProjectsJobsMessagesListRequest + .MinimumImportanceValueValuesEnum + .JOB_MESSAGE_DEBUG) + elif minimum_importance == 'JOB_MESSAGE_DETAILED': + request.minimumImportance = ( + dataflow.DataflowProjectsJobsMessagesListRequest + .MinimumImportanceValueValuesEnum + .JOB_MESSAGE_DETAILED) + elif minimum_importance == 'JOB_MESSAGE_BASIC': + request.minimumImportance = ( + dataflow.DataflowProjectsJobsMessagesListRequest + .MinimumImportanceValueValuesEnum + .JOB_MESSAGE_BASIC) + elif minimum_importance == 'JOB_MESSAGE_WARNING': + request.minimumImportance = ( + dataflow.DataflowProjectsJobsMessagesListRequest + .MinimumImportanceValueValuesEnum + .JOB_MESSAGE_WARNING) + elif minimum_importance == 'JOB_MESSAGE_ERROR': + request.minimumImportance = ( + dataflow.DataflowProjectsJobsMessagesListRequest + .MinimumImportanceValueValuesEnum + .JOB_MESSAGE_ERROR) + else: + raise RuntimeError( + 'Unexpected value for minimum_importance argument: %r', + minimum_importance) + response = self._client.projects_jobs_messages.List(request) + return response.jobMessages, response.nextPageToken + + +class MetricUpdateTranslators(object): + """Translators between accumulators and dataflow metric updates.""" + + @staticmethod + def translate_boolean(accumulator, metric_update_proto): + metric_update_proto.boolean = accumulator.value + + @staticmethod + def translate_scalar_mean_int(accumulator, metric_update_proto): + if accumulator.count: + metric_update_proto.integerMean = dataflow.IntegerMean() + metric_update_proto.integerMean.sum = to_split_int(accumulator.sum) + metric_update_proto.integerMean.count = to_split_int(accumulator.count) + else: + metric_update_proto.nameAndKind.kind = None + + @staticmethod + def translate_scalar_mean_float(accumulator, metric_update_proto): + if accumulator.count: + metric_update_proto.floatingPointMean = dataflow.FloatingPointMean() + metric_update_proto.floatingPointMean.sum = accumulator.sum + metric_update_proto.floatingPointMean.count = to_split_int( + accumulator.count) + else: + metric_update_proto.nameAndKind.kind = None + + @staticmethod + def translate_scalar_counter_int(accumulator, metric_update_proto): + metric_update_proto.integer = to_split_int(accumulator.value) + + @staticmethod + def translate_scalar_counter_float(accumulator, metric_update_proto): + metric_update_proto.floatingPoint = accumulator.value + + +def to_split_int(n): + res = dataflow.SplitInt64() + res.lowBits = n & 0xffffffff + res.highBits = n >> 32 + return res + + +def translate_distribution(distribution_update, metric_update_proto): + """Translate metrics DistributionUpdate to dataflow distribution update.""" + dist_update_proto = dataflow.DistributionUpdate() + dist_update_proto.min = to_split_int(distribution_update.min) + dist_update_proto.max = to_split_int(distribution_update.max) + dist_update_proto.count = to_split_int(distribution_update.count) + dist_update_proto.sum = to_split_int(distribution_update.sum) + metric_update_proto.distribution = dist_update_proto + + +def translate_value(value, metric_update_proto): + metric_update_proto.integer = to_split_int(value) + + +def translate_scalar(accumulator, metric_update): + metric_update.scalar = to_json_value(accumulator.value, with_type=True) + + +def translate_mean(accumulator, metric_update): + if accumulator.count: + metric_update.meanSum = to_json_value(accumulator.sum, with_type=True) + metric_update.meanCount = to_json_value(accumulator.count, with_type=True) + else: + # A denominator of 0 will raise an error in the service. + # What it means is we have nothing to report yet, so don't. + metric_update.kind = None + + +# To enable a counter on the service, add it to this dictionary. +metric_translations = { + cy_combiners.CountCombineFn: ('sum', translate_scalar), + cy_combiners.SumInt64Fn: ('sum', translate_scalar), + cy_combiners.MinInt64Fn: ('min', translate_scalar), + cy_combiners.MaxInt64Fn: ('max', translate_scalar), + cy_combiners.MeanInt64Fn: ('mean', translate_mean), + cy_combiners.SumFloatFn: ('sum', translate_scalar), + cy_combiners.MinFloatFn: ('min', translate_scalar), + cy_combiners.MaxFloatFn: ('max', translate_scalar), + cy_combiners.MeanFloatFn: ('mean', translate_mean), + cy_combiners.AllCombineFn: ('and', translate_scalar), + cy_combiners.AnyCombineFn: ('or', translate_scalar), +} + +counter_translations = { + cy_combiners.CountCombineFn: ( + dataflow.NameAndKind.KindValueValuesEnum.SUM, + MetricUpdateTranslators.translate_scalar_counter_int), + cy_combiners.SumInt64Fn: ( + dataflow.NameAndKind.KindValueValuesEnum.SUM, + MetricUpdateTranslators.translate_scalar_counter_int), + cy_combiners.MinInt64Fn: ( + dataflow.NameAndKind.KindValueValuesEnum.MIN, + MetricUpdateTranslators.translate_scalar_counter_int), + cy_combiners.MaxInt64Fn: ( + dataflow.NameAndKind.KindValueValuesEnum.MAX, + MetricUpdateTranslators.translate_scalar_counter_int), + cy_combiners.MeanInt64Fn: ( + dataflow.NameAndKind.KindValueValuesEnum.MEAN, + MetricUpdateTranslators.translate_scalar_mean_int), + cy_combiners.SumFloatFn: ( + dataflow.NameAndKind.KindValueValuesEnum.SUM, + MetricUpdateTranslators.translate_scalar_counter_float), + cy_combiners.MinFloatFn: ( + dataflow.NameAndKind.KindValueValuesEnum.MIN, + MetricUpdateTranslators.translate_scalar_counter_float), + cy_combiners.MaxFloatFn: ( + dataflow.NameAndKind.KindValueValuesEnum.MAX, + MetricUpdateTranslators.translate_scalar_counter_float), + cy_combiners.MeanFloatFn: ( + dataflow.NameAndKind.KindValueValuesEnum.MEAN, + MetricUpdateTranslators.translate_scalar_mean_float), + cy_combiners.AllCombineFn: ( + dataflow.NameAndKind.KindValueValuesEnum.AND, + MetricUpdateTranslators.translate_boolean), + cy_combiners.AnyCombineFn: ( + dataflow.NameAndKind.KindValueValuesEnum.OR, + MetricUpdateTranslators.translate_boolean), +} http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py new file mode 100644 index 0000000..bc57211 --- /dev/null +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/apiclient_test.py @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Unit tests for the apiclient module.""" + +import unittest + +from mock import Mock + +from apache_beam.metrics.cells import DistributionData +from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner +from apache_beam.runners.google_cloud_dataflow.internal import apiclient +from apache_beam.runners.google_cloud_dataflow.internal.clients import dataflow +from apache_beam.utils.pipeline_options import PipelineOptions + + +class UtilTest(unittest.TestCase): + + @unittest.skip("Enable once BEAM-1080 is fixed.") + def test_create_application_client(self): + pipeline_options = PipelineOptions() + apiclient.DataflowApplicationClient( + pipeline_options, + DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION) + + def test_default_job_name(self): + job_name = apiclient.Job.default_job_name(None) + regexp = 'beamapp-.*-[0-9]{10}-[0-9]{6}' + self.assertRegexpMatches(job_name, regexp) + + def test_split_int(self): + number = 12345 + split_number = apiclient.to_split_int(number) + self.assertEqual((split_number.lowBits, split_number.highBits), + (number, 0)) + shift_number = number << 32 + split_number = apiclient.to_split_int(shift_number) + self.assertEqual((split_number.lowBits, split_number.highBits), + (0, number)) + + def test_translate_distribution(self): + metric_update = dataflow.CounterUpdate() + distribution_update = DistributionData(16, 2, 1, 15) + apiclient.translate_distribution(distribution_update, metric_update) + self.assertEqual(metric_update.distribution.min.lowBits, + distribution_update.min) + self.assertEqual(metric_update.distribution.max.lowBits, + distribution_update.max) + self.assertEqual(metric_update.distribution.sum.lowBits, + distribution_update.sum) + self.assertEqual(metric_update.distribution.count.lowBits, + distribution_update.count) + + def test_translate_means(self): + metric_update = dataflow.CounterUpdate() + accumulator = Mock() + accumulator.sum = 16 + accumulator.count = 2 + apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator, + metric_update) + self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum) + self.assertEqual(metric_update.integerMean.count.lowBits, accumulator.count) + + accumulator.sum = 16.0 + accumulator.count = 2 + apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator, + metric_update) + self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum) + self.assertEqual( + metric_update.floatingPointMean.count.lowBits, accumulator.count) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py new file mode 100644 index 0000000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py new file mode 100644 index 0000000..a6857f8 --- /dev/null +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/__init__.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Common imports for generated dataflow client library.""" +# pylint:disable=wildcard-import + +import pkgutil + +from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_messages import * + +from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow.dataflow_v1b3_client import * + +__path__ = pkgutil.extend_path(__path__, __name__) http://git-wip-us.apache.org/repos/asf/beam/blob/4337c3ec/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py new file mode 100644 index 0000000..4d3d525 --- /dev/null +++ b/sdks/python/apache_beam/runners/google_cloud_dataflow/internal/clients/dataflow/dataflow_v1b3_client.py @@ -0,0 +1,684 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Generated client library for dataflow version v1b3.""" +# NOTE: This file is autogenerated and should not be edited by hand. +from apitools.base.py import base_api + +from apache_beam.runners.google_cloud_dataflow.internal.clients.dataflow import dataflow_v1b3_messages as messages + + +class DataflowV1b3(base_api.BaseApiClient): + """Generated client library for service dataflow version v1b3.""" + + MESSAGES_MODULE = messages + BASE_URL = u'https://dataflow.googleapis.com/' + + _PACKAGE = u'dataflow' + _SCOPES = [u'https://www.googleapis.com/auth/cloud-platform', u'https://www.googleapis.com/auth/userinfo.email'] + _VERSION = u'v1b3' + _CLIENT_ID = '1042881264118.apps.googleusercontent.com' + _CLIENT_SECRET = 'x_Tw5K8nnjoRAqULM9PFAC2b' + _USER_AGENT = 'x_Tw5K8nnjoRAqULM9PFAC2b' + _CLIENT_CLASS_NAME = u'DataflowV1b3' + _URL_VERSION = u'v1b3' + _API_KEY = None + + def __init__(self, url='', credentials=None, + get_credentials=True, http=None, model=None, + log_request=False, log_response=False, + credentials_args=None, default_global_params=None, + additional_http_headers=None): + """Create a new dataflow handle.""" + url = url or self.BASE_URL + super(DataflowV1b3, self).__init__( + url, credentials=credentials, + get_credentials=get_credentials, http=http, model=model, + log_request=log_request, log_response=log_response, + credentials_args=credentials_args, + default_global_params=default_global_params, + additional_http_headers=additional_http_headers) + self.projects_jobs_debug = self.ProjectsJobsDebugService(self) + self.projects_jobs_messages = self.ProjectsJobsMessagesService(self) + self.projects_jobs_workItems = self.ProjectsJobsWorkItemsService(self) + self.projects_jobs = self.ProjectsJobsService(self) + self.projects_locations_jobs_messages = self.ProjectsLocationsJobsMessagesService(self) + self.projects_locations_jobs_workItems = self.ProjectsLocationsJobsWorkItemsService(self) + self.projects_locations_jobs = self.ProjectsLocationsJobsService(self) + self.projects_locations = self.ProjectsLocationsService(self) + self.projects_templates = self.ProjectsTemplatesService(self) + self.projects = self.ProjectsService(self) + + class ProjectsJobsDebugService(base_api.BaseApiService): + """Service class for the projects_jobs_debug resource.""" + + _NAME = u'projects_jobs_debug' + + def __init__(self, client): + super(DataflowV1b3.ProjectsJobsDebugService, self).__init__(client) + self._upload_configs = { + } + + def GetConfig(self, request, global_params=None): + """Get encoded debug configuration for component. Not cacheable. + + Args: + request: (DataflowProjectsJobsDebugGetConfigRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (GetDebugConfigResponse) The response message. + """ + config = self.GetMethodConfig('GetConfig') + return self._RunMethod( + config, request, global_params=global_params) + + GetConfig.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.debug.getConfig', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig', + request_field=u'getDebugConfigRequest', + request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest', + response_type_name=u'GetDebugConfigResponse', + supports_download=False, + ) + + def SendCapture(self, request, global_params=None): + """Send encoded debug capture data for component. + + Args: + request: (DataflowProjectsJobsDebugSendCaptureRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (SendDebugCaptureResponse) The response message. + """ + config = self.GetMethodConfig('SendCapture') + return self._RunMethod( + config, request, global_params=global_params) + + SendCapture.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.debug.sendCapture', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/sendCapture', + request_field=u'sendDebugCaptureRequest', + request_type_name=u'DataflowProjectsJobsDebugSendCaptureRequest', + response_type_name=u'SendDebugCaptureResponse', + supports_download=False, + ) + + class ProjectsJobsMessagesService(base_api.BaseApiService): + """Service class for the projects_jobs_messages resource.""" + + _NAME = u'projects_jobs_messages' + + def __init__(self, client): + super(DataflowV1b3.ProjectsJobsMessagesService, self).__init__(client) + self._upload_configs = { + } + + def List(self, request, global_params=None): + """Request the job status. + + Args: + request: (DataflowProjectsJobsMessagesListRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (ListJobMessagesResponse) The response message. + """ + config = self.GetMethodConfig('List') + return self._RunMethod( + config, request, global_params=global_params) + + List.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.messages.list', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[u'endTime', u'location', u'minimumImportance', u'pageSize', u'pageToken', u'startTime'], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/messages', + request_field='', + request_type_name=u'DataflowProjectsJobsMessagesListRequest', + response_type_name=u'ListJobMessagesResponse', + supports_download=False, + ) + + class ProjectsJobsWorkItemsService(base_api.BaseApiService): + """Service class for the projects_jobs_workItems resource.""" + + _NAME = u'projects_jobs_workItems' + + def __init__(self, client): + super(DataflowV1b3.ProjectsJobsWorkItemsService, self).__init__(client) + self._upload_configs = { + } + + def Lease(self, request, global_params=None): + """Leases a dataflow WorkItem to run. + + Args: + request: (DataflowProjectsJobsWorkItemsLeaseRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (LeaseWorkItemResponse) The response message. + """ + config = self.GetMethodConfig('Lease') + return self._RunMethod( + config, request, global_params=global_params) + + Lease.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.workItems.lease', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:lease', + request_field=u'leaseWorkItemRequest', + request_type_name=u'DataflowProjectsJobsWorkItemsLeaseRequest', + response_type_name=u'LeaseWorkItemResponse', + supports_download=False, + ) + + def ReportStatus(self, request, global_params=None): + """Reports the status of dataflow WorkItems leased by a worker. + + Args: + request: (DataflowProjectsJobsWorkItemsReportStatusRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (ReportWorkItemStatusResponse) The response message. + """ + config = self.GetMethodConfig('ReportStatus') + return self._RunMethod( + config, request, global_params=global_params) + + ReportStatus.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.workItems.reportStatus', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:reportStatus', + request_field=u'reportWorkItemStatusRequest', + request_type_name=u'DataflowProjectsJobsWorkItemsReportStatusRequest', + response_type_name=u'ReportWorkItemStatusResponse', + supports_download=False, + ) + + class ProjectsJobsService(base_api.BaseApiService): + """Service class for the projects_jobs resource.""" + + _NAME = u'projects_jobs' + + def __init__(self, client): + super(DataflowV1b3.ProjectsJobsService, self).__init__(client) + self._upload_configs = { + } + + def Create(self, request, global_params=None): + """Creates a Cloud Dataflow job. + + Args: + request: (DataflowProjectsJobsCreateRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (Job) The response message. + """ + config = self.GetMethodConfig('Create') + return self._RunMethod( + config, request, global_params=global_params) + + Create.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.create', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[u'location', u'replaceJobId', u'view'], + relative_path=u'v1b3/projects/{projectId}/jobs', + request_field=u'job', + request_type_name=u'DataflowProjectsJobsCreateRequest', + response_type_name=u'Job', + supports_download=False, + ) + + def Get(self, request, global_params=None): + """Gets the state of the specified Cloud Dataflow job. + + Args: + request: (DataflowProjectsJobsGetRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (Job) The response message. + """ + config = self.GetMethodConfig('Get') + return self._RunMethod( + config, request, global_params=global_params) + + Get.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.get', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[u'location', u'view'], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}', + request_field='', + request_type_name=u'DataflowProjectsJobsGetRequest', + response_type_name=u'Job', + supports_download=False, + ) + + def GetMetrics(self, request, global_params=None): + """Request the job status. + + Args: + request: (DataflowProjectsJobsGetMetricsRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (JobMetrics) The response message. + """ + config = self.GetMethodConfig('GetMetrics') + return self._RunMethod( + config, request, global_params=global_params) + + GetMetrics.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.getMetrics', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[u'location', u'startTime'], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/metrics', + request_field='', + request_type_name=u'DataflowProjectsJobsGetMetricsRequest', + response_type_name=u'JobMetrics', + supports_download=False, + ) + + def List(self, request, global_params=None): + """List the jobs of a project. + + Args: + request: (DataflowProjectsJobsListRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (ListJobsResponse) The response message. + """ + config = self.GetMethodConfig('List') + return self._RunMethod( + config, request, global_params=global_params) + + List.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.list', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[u'filter', u'location', u'pageSize', u'pageToken', u'view'], + relative_path=u'v1b3/projects/{projectId}/jobs', + request_field='', + request_type_name=u'DataflowProjectsJobsListRequest', + response_type_name=u'ListJobsResponse', + supports_download=False, + ) + + def Update(self, request, global_params=None): + """Updates the state of an existing Cloud Dataflow job. + + Args: + request: (DataflowProjectsJobsUpdateRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (Job) The response message. + """ + config = self.GetMethodConfig('Update') + return self._RunMethod( + config, request, global_params=global_params) + + Update.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'PUT', + method_id=u'dataflow.projects.jobs.update', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[u'location'], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}', + request_field=u'job', + request_type_name=u'DataflowProjectsJobsUpdateRequest', + response_type_name=u'Job', + supports_download=False, + ) + + class ProjectsLocationsJobsMessagesService(base_api.BaseApiService): + """Service class for the projects_locations_jobs_messages resource.""" + + _NAME = u'projects_locations_jobs_messages' + + def __init__(self, client): + super(DataflowV1b3.ProjectsLocationsJobsMessagesService, self).__init__(client) + self._upload_configs = { + } + + def List(self, request, global_params=None): + """Request the job status. + + Args: + request: (DataflowProjectsLocationsJobsMessagesListRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (ListJobMessagesResponse) The response message. + """ + config = self.GetMethodConfig('List') + return self._RunMethod( + config, request, global_params=global_params) + + List.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.jobs.messages.list', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[u'endTime', u'minimumImportance', u'pageSize', u'pageToken', u'startTime'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/messages', + request_field='', + request_type_name=u'DataflowProjectsLocationsJobsMessagesListRequest', + response_type_name=u'ListJobMessagesResponse', + supports_download=False, + ) + + class ProjectsLocationsJobsWorkItemsService(base_api.BaseApiService): + """Service class for the projects_locations_jobs_workItems resource.""" + + _NAME = u'projects_locations_jobs_workItems' + + def __init__(self, client): + super(DataflowV1b3.ProjectsLocationsJobsWorkItemsService, self).__init__(client) + self._upload_configs = { + } + + def Lease(self, request, global_params=None): + """Leases a dataflow WorkItem to run. + + Args: + request: (DataflowProjectsLocationsJobsWorkItemsLeaseRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (LeaseWorkItemResponse) The response message. + """ + config = self.GetMethodConfig('Lease') + return self._RunMethod( + config, request, global_params=global_params) + + Lease.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.jobs.workItems.lease', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:lease', + request_field=u'leaseWorkItemRequest', + request_type_name=u'DataflowProjectsLocationsJobsWorkItemsLeaseRequest', + response_type_name=u'LeaseWorkItemResponse', + supports_download=False, + ) + + def ReportStatus(self, request, global_params=None): + """Reports the status of dataflow WorkItems leased by a worker. + + Args: + request: (DataflowProjectsLocationsJobsWorkItemsReportStatusRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (ReportWorkItemStatusResponse) The response message. + """ + config = self.GetMethodConfig('ReportStatus') + return self._RunMethod( + config, request, global_params=global_params) + + ReportStatus.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.jobs.workItems.reportStatus', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:reportStatus', + request_field=u'reportWorkItemStatusRequest', + request_type_name=u'DataflowProjectsLocationsJobsWorkItemsReportStatusRequest', + response_type_name=u'ReportWorkItemStatusResponse', + supports_download=False, + ) + + class ProjectsLocationsJobsService(base_api.BaseApiService): + """Service class for the projects_locations_jobs resource.""" + + _NAME = u'projects_locations_jobs' + + def __init__(self, client): + super(DataflowV1b3.ProjectsLocationsJobsService, self).__init__(client) + self._upload_configs = { + } + + def Create(self, request, global_params=None): + """Creates a Cloud Dataflow job. + + Args: + request: (DataflowProjectsLocationsJobsCreateRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (Job) The response message. + """ + config = self.GetMethodConfig('Create') + return self._RunMethod( + config, request, global_params=global_params) + + Create.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.jobs.create', + ordered_params=[u'projectId', u'location'], + path_params=[u'location', u'projectId'], + query_params=[u'replaceJobId', u'view'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs', + request_field=u'job', + request_type_name=u'DataflowProjectsLocationsJobsCreateRequest', + response_type_name=u'Job', + supports_download=False, + ) + + def Get(self, request, global_params=None): + """Gets the state of the specified Cloud Dataflow job. + + Args: + request: (DataflowProjectsLocationsJobsGetRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (Job) The response message. + """ + config = self.GetMethodConfig('Get') + return self._RunMethod( + config, request, global_params=global_params) + + Get.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.jobs.get', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[u'view'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}', + request_field='', + request_type_name=u'DataflowProjectsLocationsJobsGetRequest', + response_type_name=u'Job', + supports_download=False, + ) + + def GetMetrics(self, request, global_params=None): + """Request the job status. + + Args: + request: (DataflowProjectsLocationsJobsGetMetricsRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (JobMetrics) The response message. + """ + config = self.GetMethodConfig('GetMetrics') + return self._RunMethod( + config, request, global_params=global_params) + + GetMetrics.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.jobs.getMetrics', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[u'startTime'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/metrics', + request_field='', + request_type_name=u'DataflowProjectsLocationsJobsGetMetricsRequest', + response_type_name=u'JobMetrics', + supports_download=False, + ) + + def List(self, request, global_params=None): + """List the jobs of a project. + + Args: + request: (DataflowProjectsLocationsJobsListRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (ListJobsResponse) The response message. + """ + config = self.GetMethodConfig('List') + return self._RunMethod( + config, request, global_params=global_params) + + List.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.jobs.list', + ordered_params=[u'projectId', u'location'], + path_params=[u'location', u'projectId'], + query_params=[u'filter', u'pageSize', u'pageToken', u'view'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs', + request_field='', + request_type_name=u'DataflowProjectsLocationsJobsListRequest', + response_type_name=u'ListJobsResponse', + supports_download=False, + ) + + def Update(self, request, global_params=None): + """Updates the state of an existing Cloud Dataflow job. + + Args: + request: (DataflowProjectsLocationsJobsUpdateRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (Job) The response message. + """ + config = self.GetMethodConfig('Update') + return self._RunMethod( + config, request, global_params=global_params) + + Update.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'PUT', + method_id=u'dataflow.projects.locations.jobs.update', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}', + request_field=u'job', + request_type_name=u'DataflowProjectsLocationsJobsUpdateRequest', + response_type_name=u'Job', + supports_download=False, + ) + + class ProjectsLocationsService(base_api.BaseApiService): + """Service class for the projects_locations resource.""" + + _NAME = u'projects_locations' + + def __init__(self, client): + super(DataflowV1b3.ProjectsLocationsService, self).__init__(client) + self._upload_configs = { + } + + class ProjectsTemplatesService(base_api.BaseApiService): + """Service class for the projects_templates resource.""" + + _NAME = u'projects_templates' + + def __init__(self, client): + super(DataflowV1b3.ProjectsTemplatesService, self).__init__(client) + self._upload_configs = { + } + + def Create(self, request, global_params=None): + """Creates a Cloud Dataflow job from a template. + + Args: + request: (DataflowProjectsTemplatesCreateRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (Job) The response message. + """ + config = self.GetMethodConfig('Create') + return self._RunMethod( + config, request, global_params=global_params) + + Create.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.templates.create', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/templates', + request_field=u'createJobFromTemplateRequest', + request_type_name=u'DataflowProjectsTemplatesCreateRequest', + response_type_name=u'Job', + supports_download=False, + ) + + class ProjectsService(base_api.BaseApiService): + """Service class for the projects resource.""" + + _NAME = u'projects' + + def __init__(self, client): + super(DataflowV1b3.ProjectsService, self).__init__(client) + self._upload_configs = { + } + + def WorkerMessages(self, request, global_params=None): + """Send a worker_message to the service. + + Args: + request: (DataflowProjectsWorkerMessagesRequest) input message + global_params: (StandardQueryParameters, default: None) global arguments + Returns: + (SendWorkerMessagesResponse) The response message. + """ + config = self.GetMethodConfig('WorkerMessages') + return self._RunMethod( + config, request, global_params=global_params) + + WorkerMessages.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.workerMessages', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/WorkerMessages', + request_field=u'sendWorkerMessagesRequest', + request_type_name=u'DataflowProjectsWorkerMessagesRequest', + response_type_name=u'SendWorkerMessagesResponse', + supports_download=False, + )
