Repository: incubator-beam Updated Branches: refs/heads/python-sdk 5541c0305 -> fa302a3be
Deletes some code that is not used by SDK. Also deletes corresponding tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d50d804 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d50d804 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d50d804 Branch: refs/heads/python-sdk Commit: 7d50d8040585e0cea5bc02de4cb199f29c1472fc Parents: 5541c03 Author: Chamikara Jayalath <[email protected]> Authored: Fri Jul 29 15:40:39 2016 -0700 Committer: Chamikara Jayalath <[email protected]> Committed: Fri Jul 29 15:40:39 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/apiclient.py | 347 ------------------- .../apache_beam/internal/apiclient_test.py | 92 +---- 2 files changed, 7 insertions(+), 432 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d50d804/sdks/python/apache_beam/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 137a40b..bc4a4e0 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -29,13 +29,10 @@ from apitools.base.py import encoding from apitools.base.py import exceptions from apache_beam import utils -from apache_beam.internal import pickler from apache_beam.internal.auth import get_service_credentials from apache_beam.internal.json_value import to_json_value -from apache_beam.io import iobase from apache_beam.transforms import cy_combiners from apache_beam.utils import dependency -from apache_beam.utils import names from apache_beam.utils import retry from apache_beam.utils.dependency import get_required_container_version from apache_beam.utils.dependency import get_sdk_name_and_version @@ -53,77 +50,6 @@ COMPUTE_API_SERVICE = 'compute.googleapis.com' STORAGE_API_SERVICE = 'storage.googleapis.com' -def append_counter(status_object, counter, tentative): - """Appends a counter to the status. - - Args: - status_object: a work_item_status to which to add this counter - counter: a counters.Counter object to append - tentative: whether the value should be reported as tentative - """ - logging.debug('Appending counter%s %s', - ' (tentative)' if tentative else '', - counter) - kind, setter = metric_translations[counter.combine_fn.__class__] - append_metric( - status_object, counter.name, kind, counter.accumulator, - setter, tentative=tentative) - - -def append_metric(status_object, metric_name, kind, value, setter=None, - step=None, output_user_name=None, tentative=False, - worker_id=None, cumulative=True): - """Creates and adds a MetricUpdate field to the passed-in protobuf. - - Args: - status_object: a work_item_status to which to add this metric - metric_name: a string naming this metric - kind: dataflow counter kind (e.g. 'sum') - value: accumulator value to encode - setter: if not None, a lambda to use to update metric_update with value - step: the name of the associated step - output_user_name: the user-visible name to use - tentative: whether this should be labeled as a tentative metric - worker_id: the id of this worker. Specifying a worker_id also - causes this to be encoded as a metric, not a counter. - cumulative: Whether this metric is cumulative, default True. - Set to False for a delta value. - """ - # Does this look like a counter or like a metric? - is_counter = not worker_id - - metric_update = dataflow.MetricUpdate() - metric_update.name = dataflow.MetricStructuredName() - metric_update.name.name = metric_name - # Handle attributes stored in the name context - if step or output_user_name or tentative or worker_id: - metric_update.name.context = dataflow.MetricStructuredName.ContextValue() - - def append_to_context(key, value): - metric_update.name.context.additionalProperties.append( - dataflow.MetricStructuredName.ContextValue.AdditionalProperty( - key=key, value=value)) - if step: - append_to_context('step', step) - if output_user_name: - append_to_context('output_user_name', output_user_name) - if tentative: - append_to_context('tentative', 'true') - if worker_id: - append_to_context('workerId', worker_id) - if cumulative and is_counter: - metric_update.cumulative = cumulative - if is_counter: - # Counters are distinguished by having a kind; metrics do not. - metric_update.kind = kind - if setter: - setter(value, metric_update) - else: - metric_update.scalar = to_json_value(value, with_type=True) - logging.debug('Appending metric_update: %s', metric_update) - status_object.metricUpdates.append(metric_update) - - class Step(object): """Wrapper for a dataflow Step protobuf.""" @@ -615,238 +541,6 @@ class DataflowApplicationClient(object): return response.jobMessages, response.nextPageToken -class DataflowWorkerClient(object): - """A Dataflow API client used by worker code to lease work items.""" - - def __init__(self, worker, skip_get_credentials=False): - """Initializes a Dataflow API client object with worker functionality. - - Args: - worker: A Worker instance. - skip_get_credentials: If true disables credentials loading logic. - """ - self._client = ( - dataflow.DataflowV1b3( - url=worker.service_path, - get_credentials=(not skip_get_credentials))) - - @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py - def lease_work(self, worker_info, desired_lease_duration): - """Leases a work item from the service.""" - work_request = dataflow.LeaseWorkItemRequest() - work_request.workerId = worker_info.worker_id - work_request.requestedLeaseDuration = desired_lease_duration - work_request.currentWorkerTime = worker_info.formatted_current_time - work_request.workerCapabilities.append(worker_info.worker_id) - for value in worker_info.capabilities: - work_request.workerCapabilities.append(value) - for value in worker_info.work_types: - work_request.workItemTypes.append(value) - request = dataflow.DataflowProjectsJobsWorkItemsLeaseRequest() - request.jobId = worker_info.job_id - request.projectId = worker_info.project_id - try: - request.leaseWorkItemRequest = work_request - except AttributeError: - request.lease_work_item_request = work_request - logging.debug('lease_work: %s', request) - response = self._client.projects_jobs_workItems.Lease(request) - logging.debug('lease_work: %s', response) - return response - - def report_status(self, - worker_info, - desired_lease_duration, - work_item, - completed, - progress, - dynamic_split_result_to_report=None, - source_operation_response=None, - exception_details=None): - """Reports status for a work item (success or failure). - - This is an integration point. The @retry decorator is used on callers - of this method defined in apache_beam/worker/worker.py because - there are different retry strategies for a completed versus in progress - work item. - - Args: - worker_info: A batchworker.BatchWorkerInfo that contains - information about the Worker instance executing the work - item. - desired_lease_duration: The duration for which the worker would like to - extend the lease of the work item. Should be in seconds formatted as a - string. - work_item: The work item for which to report status. - completed: True if there is no further work to be done on this work item - either because it succeeded or because it failed. False if this is a - progress report. - progress: A SourceReaderProgress that gives the progress of worker - handling the work item. - dynamic_split_result_to_report: A successful dynamic split result that - should be sent to the Dataflow service along with the status report. - source_operation_response: Response to a source operation request from - the service. This will be sent to the service along with the status - report. - exception_details: A string representation of the stack trace for an - exception raised while executing the work item. The string is the - output of the standard traceback.format_exc() function. - - Returns: - A protobuf containing the response from the service for the status - update (WorkItemServiceState). - - Raises: - TypeError: if progress is of an unknown type - RuntimeError: if dynamic split request is of an unknown type. - """ - work_item_status = dataflow.WorkItemStatus() - work_item_status.completed = completed - - if not completed: - work_item_status.requestedLeaseDuration = desired_lease_duration - - if progress is not None: - work_item_progress = dataflow.ApproximateProgress() - work_item_status.progress = work_item_progress - - if progress.position is not None: - work_item_progress.position = ( - reader_position_to_cloud_position(progress.position)) - elif progress.percent_complete is not None: - work_item_progress.percentComplete = progress.percent_complete - elif progress.remaining_time is not None: - work_item_progress.remainingTime = progress.remaining_time - else: - raise TypeError('Unknown type of progress') - - if dynamic_split_result_to_report is not None: - assert isinstance(dynamic_split_result_to_report, - iobase.DynamicSplitResult) - - if isinstance(dynamic_split_result_to_report, - iobase.DynamicSplitResultWithPosition): - work_item_status.stopPosition = ( - dynamic_split_result_with_position_to_cloud_stop_position( - dynamic_split_result_to_report)) - else: - raise RuntimeError('Unknown type of dynamic split result.') - - # The service keeps track of the report indexes in order to handle lost - # and duplicate message. - work_item_status.reportIndex = work_item.next_report_index - work_item_status.workItemId = str(work_item.proto.id) - - # Add exception information if any. - if exception_details is not None: - status = dataflow.Status() - # TODO(silviuc): Replace Code.UNKNOWN with a generated definition. - status.code = 2 - # TODO(silviuc): Attach the stack trace as exception details. - status.message = exception_details - work_item_status.errors.append(status) - - if source_operation_response is not None: - work_item_status.sourceOperationResponse = source_operation_response - - # Look through the work item for metrics to send. - if work_item.map_task: - for counter in work_item.map_task.itercounters(): - append_counter(work_item_status, counter, tentative=not completed) - - report_request = dataflow.ReportWorkItemStatusRequest() - report_request.currentWorkerTime = worker_info.formatted_current_time - report_request.workerId = worker_info.worker_id - report_request.workItemStatuses.append(work_item_status) - - request = dataflow.DataflowProjectsJobsWorkItemsReportStatusRequest() - request.jobId = worker_info.job_id - request.projectId = worker_info.project_id - try: - request.reportWorkItemStatusRequest = report_request - except AttributeError: - request.report_work_item_status_request = report_request - logging.debug('report_status: %s', request) - response = self._client.projects_jobs_workItems.ReportStatus(request) - logging.debug('report_status: %s', response) - return response - -# Utility functions for translating cloud reader objects to corresponding SDK -# reader objects and vice versa. - - -def reader_progress_to_cloud_progress(reader_progress): - """Converts a given 'ReaderProgress' to corresponding cloud format.""" - - cloud_progress = dataflow.ApproximateProgress() - if reader_progress.position is not None: - cloud_progress.position = reader_position_to_cloud_position( - reader_progress.position) - if reader_progress.percent_complete is not None: - cloud_progress.percentComplete = reader_progress.percent_complete - if reader_progress.remaining_time is not None: - cloud_progress.remainingTime = reader_progress.remaining_time - - return cloud_progress - - -def reader_position_to_cloud_position(reader_position): - """Converts a given 'ReaderPosition' to corresponding cloud format.""" - - cloud_position = dataflow.Position() - if reader_position.end is not None: - cloud_position.end = reader_position.end - if reader_position.key is not None: - cloud_position.key = reader_position.key - if reader_position.byte_offset is not None: - cloud_position.byteOffset = reader_position.byte_offset - if reader_position.record_index is not None: - cloud_position.recordIndex = reader_position.record_index - if reader_position.shuffle_position is not None: - cloud_position.shufflePosition = reader_position.shuffle_position - if reader_position.concat_position is not None: - concat_position = dataflow.ConcatPosition() - concat_position.index = reader_position.concat_position.index - concat_position.position = reader_position_to_cloud_position( - reader_position.concat_position.position) - cloud_position.concatPosition = concat_position - - return cloud_position - - -def dynamic_split_result_with_position_to_cloud_stop_position(split_result): - """Converts a given 'DynamicSplitResultWithPosition' to cloud format.""" - - return reader_position_to_cloud_position(split_result.stop_position) - - -def cloud_progress_to_reader_progress(cloud_progress): - reader_position = None - if cloud_progress.position is not None: - reader_position = cloud_position_to_reader_position(cloud_progress.position) - return iobase.ReaderProgress(reader_position, cloud_progress.percentComplete, - cloud_progress.remainingTime) - - -def cloud_position_to_reader_position(cloud_position): - concat_position = None - if cloud_position.concatPosition is not None: - inner_position = cloud_position_to_reader_position( - cloud_position.concatPosition.position) - concat_position = iobase.ConcatPosition(cloud_position.index, - inner_position) - - return iobase.ReaderPosition(cloud_position.end, cloud_position.key, - cloud_position.byteOffset, - cloud_position.recordIndex, - cloud_position.shufflePosition, concat_position) - - -def approximate_progress_to_dynamic_split_request(approximate_progress): - return iobase.DynamicSplitRequest(cloud_progress_to_reader_progress( - approximate_progress)) - - def set_scalar(accumulator, metric_update): metric_update.scalar = to_json_value(accumulator.value, with_type=True) @@ -875,44 +569,3 @@ metric_translations = { cy_combiners.AllCombineFn: ('and', set_scalar), cy_combiners.AnyCombineFn: ('or', set_scalar), } - - -def splits_to_split_response(bundles): - """Generates a response to a custom source split request. - - Args: - bundles: a set of bundles generated by a BoundedSource.split() invocation. - Returns: - a SourceOperationResponse object. - """ - derived_sources = [] - for bundle in bundles: - derived_source = dataflow.DerivedSource() - derived_source.derivationMode = ( - dataflow.DerivedSource.DerivationModeValueValuesEnum - .SOURCE_DERIVATION_MODE_INDEPENDENT) - derived_source.source = dataflow.Source() - derived_source.source.doesNotNeedSplitting = True - - derived_source.source.spec = dataflow.Source.SpecValue() - derived_source.source.spec.additionalProperties.append( - dataflow.Source.SpecValue.AdditionalProperty( - key=names.SERIALIZED_SOURCE_KEY, - value=to_json_value(pickler.dumps( - (bundle.source, bundle.start_position, bundle.stop_position)), - with_type=True))) - derived_source.source.spec.additionalProperties.append( - dataflow.Source.SpecValue.AdditionalProperty(key='@type', - value=to_json_value( - names.SOURCE_TYPE))) - derived_sources.append(derived_source) - - split_response = dataflow.SourceSplitResponse() - split_response.bundles = derived_sources - split_response.outcome = ( - dataflow.SourceSplitResponse.OutcomeValueValuesEnum - .SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED) - - response = dataflow.SourceOperationResponse() - response.split = split_response - return response http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d50d804/sdks/python/apache_beam/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py index 68ad842..8fddae7 100644 --- a/sdks/python/apache_beam/internal/apiclient_test.py +++ b/sdks/python/apache_beam/internal/apiclient_test.py @@ -18,96 +18,18 @@ import unittest +from apache_beam.utils.options import PipelineOptions +from apache_beam.runners.dataflow_runner import DataflowPipelineRunner from apache_beam.internal import apiclient -from apache_beam.io import iobase - -import apache_beam.internal.clients.dataflow as dataflow class UtilTest(unittest.TestCase): - def test_reader_progress_to_cloud_progress_position(self): - reader_position = iobase.ReaderPosition(byte_offset=9999) - reader_progress = iobase.ReaderProgress(position=reader_position) - - cloud_progress = apiclient.reader_progress_to_cloud_progress( - reader_progress) - self.assertIsNotNone(cloud_progress) - self.assertIsInstance(cloud_progress, dataflow.ApproximateProgress) - self.assertIsNotNone(cloud_progress.position) - self.assertIsInstance(cloud_progress.position, dataflow.Position) - self.assertEquals(9999, cloud_progress.position.byteOffset) - - def test_reader_progress_to_cloud_progress_percent_complete(self): - reader_progress = iobase.ReaderProgress(percent_complete=0.123) - - cloud_progress = apiclient.reader_progress_to_cloud_progress( - reader_progress) - self.assertIsNotNone(cloud_progress) - self.assertIsInstance(cloud_progress, dataflow.ApproximateProgress) - self.assertIsNotNone(cloud_progress.percentComplete) - self.assertEquals(0.123, cloud_progress.percentComplete) - - def test_reader_position_to_cloud_position(self): - reader_position = iobase.ReaderPosition(byte_offset=9999) - - cloud_position = apiclient.reader_position_to_cloud_position( - reader_position) - self.assertIsNotNone(cloud_position) - - def test_dynamic_split_result_with_position_to_cloud_stop_position(self): - position = iobase.ReaderPosition(byte_offset=9999) - dynamic_split_result = iobase.DynamicSplitResultWithPosition(position) - - approximate_position = ( - apiclient.dynamic_split_result_with_position_to_cloud_stop_position( - dynamic_split_result)) - self.assertIsNotNone(approximate_position) - self.assertIsInstance(approximate_position, dataflow.Position) - self.assertEqual(9999, approximate_position.byteOffset) - - def test_cloud_progress_to_reader_progress_index_position(self): - cloud_progress = dataflow.ApproximateProgress() - cloud_progress.position = dataflow.Position() - cloud_progress.position.byteOffset = 9999 - - reader_progress = apiclient.cloud_progress_to_reader_progress( - cloud_progress) - self.assertIsNotNone(reader_progress.position) - self.assertIsInstance(reader_progress.position, iobase.ReaderPosition) - self.assertEqual(9999, reader_progress.position.byte_offset) - - def test_cloud_progress_to_reader_progress_percent_complete(self): - cloud_progress = dataflow.ApproximateProgress() - cloud_progress.percentComplete = 0.123 - - reader_progress = apiclient.cloud_progress_to_reader_progress( - cloud_progress) - self.assertIsNotNone(reader_progress.percent_complete) - self.assertEqual(0.123, reader_progress.percent_complete) - - def test_cloud_position_to_reader_position_byte_offset(self): - cloud_position = dataflow.Position() - cloud_position.byteOffset = 9999 - - reader_position = apiclient.cloud_position_to_reader_position( - cloud_position) - self.assertIsNotNone(reader_position) - self.assertIsInstance(reader_position, iobase.ReaderPosition) - self.assertEqual(9999, reader_position.byte_offset) - - def test_approximate_progress_to_dynamic_split_request(self): - approximate_progress = dataflow.ApproximateProgress() - approximate_progress.percentComplete = 0.123 - - dynamic_split_request = ( - apiclient.approximate_progress_to_dynamic_split_request( - approximate_progress)) - self.assertIsNotNone(dynamic_split_request) - self.assertIsInstance(dynamic_split_request.progress, iobase.ReaderProgress) - self.assertIsNotNone(dynamic_split_request.progress.percent_complete) - self.assertEqual(dynamic_split_request.progress.percent_complete, 0.123) - + def test_create_application_client(self): + pipeline_options = PipelineOptions() + apiclient.DataflowApplicationClient( + pipeline_options, + DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION) if __name__ == '__main__': unittest.main()
