This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch revert-28512-display-data in repository https://gitbox.apache.org/repos/asf/beam.git
commit 3844972d681a317c1281685edc5f6fc8783c8043 Author: Robert Bradshaw <[email protected]> AuthorDate: Mon Oct 9 16:15:10 2023 -0700 Revert "Populate top-level display data in yaml main. (#28512)" This reverts commit 2bbb3485c78ede3c4acddd462158814157f2b46f. --- sdks/python/apache_beam/pipeline.py | 28 ++++++--------------------- sdks/python/apache_beam/transforms/display.py | 23 ++++++++-------------- sdks/python/apache_beam/yaml/main.py | 17 +++++++--------- 3 files changed, 21 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 14177cd603d..042b483d50f 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -88,7 +88,6 @@ from apache_beam.runners import create_runner from apache_beam.transforms import ParDo from apache_beam.transforms import ptransform from apache_beam.transforms.display import DisplayData -from apache_beam.transforms.display import HasDisplayData from apache_beam.transforms.resources import merge_resource_hints from apache_beam.transforms.resources import resource_hints_from_options from apache_beam.transforms.sideinputs import get_sideinput_index @@ -109,7 +108,7 @@ if TYPE_CHECKING: __all__ = ['Pipeline', 'PTransformOverride'] -class Pipeline(HasDisplayData): +class Pipeline(object): """A pipeline object that manages a DAG of :class:`~apache_beam.pvalue.PValue` s and their :class:`~apache_beam.transforms.ptransform.PTransform` s. @@ -134,12 +133,9 @@ class Pipeline(HasDisplayData): common_urns.primitives.IMPULSE.urn, ]) - def __init__( - self, - runner: Optional[Union[str, PipelineRunner]] = None, - options: Optional[PipelineOptions] = None, - argv: Optional[List[str]] = None, - display_data: Optional[Dict[str, Any]] = None): + def __init__(self, runner=None, options=None, argv=None): + # type: (Optional[Union[str, PipelineRunner]], Optional[PipelineOptions], Optional[List[str]]) -> None + """Initialize a pipeline object. Args: @@ -155,8 +151,6 @@ class Pipeline(HasDisplayData): to be used for building a :class:`~apache_beam.options.pipeline_options.PipelineOptions` object. This will only be used if argument **options** is :data:`None`. - display_data (Dict[str: Any]): a dictionary of static data associated - with this pipeline that can be displayed when it runs. Raises: ValueError: if either the runner or options argument is not @@ -239,11 +233,6 @@ class Pipeline(HasDisplayData): # Records whether this pipeline contains any external transforms. self.contains_external_transforms = False - self._display_data = display_data or {} - - def display_data(self): - # type: () -> Dict[str, Any] - return self._display_data @property # type: ignore[misc] # decorated property not supported def options(self): @@ -925,8 +914,7 @@ class Pipeline(HasDisplayData): proto = beam_runner_api_pb2.Pipeline( root_transform_ids=[root_transform_id], components=context.to_runner_api(), - requirements=context.requirements(), - display_data=DisplayData('', self._display_data).to_proto()) + requirements=context.requirements()) proto.components.transforms[root_transform_id].unique_name = ( root_transform_id) self.merge_compatible_environments(proto) @@ -982,11 +970,7 @@ class Pipeline(HasDisplayData): # type: (...) -> Pipeline """For internal use only; no backwards-compatibility guarantees.""" - p = Pipeline( - runner=runner, - options=options, - display_data={str(ix): d - for ix, d in enumerate(proto.display_data)}) + p = Pipeline(runner=runner, options=options) from apache_beam.runners import pipeline_context context = pipeline_context.PipelineContext( proto.components, requirements=proto.requirements) diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index 0d1dd552413..b52a8fd5b6d 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -45,7 +45,6 @@ from datetime import datetime from datetime import timedelta from typing import TYPE_CHECKING from typing import List -from typing import Union from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 @@ -102,8 +101,7 @@ class DisplayData(object): ): # type: (...) -> None self.namespace = namespace - self.items = [ - ] # type: List[Union[DisplayDataItem, beam_runner_api_pb2.DisplayData]] + self.items = [] # type: List[DisplayDataItem] self._populate_items(display_data_dict) def _populate_items(self, display_data_dict): @@ -114,31 +112,26 @@ class DisplayData(object): subcomponent_display_data = DisplayData( element._get_display_data_namespace(), element.display_data()) self.items += subcomponent_display_data.items + continue - elif isinstance(element, DisplayDataItem): + if isinstance(element, DisplayDataItem): if element.should_drop(): continue element.key = key element.namespace = self.namespace self.items.append(element) + continue - elif isinstance(element, beam_runner_api_pb2.DisplayData): - self.items.append(element) - - else: - # If it's not a HasDisplayData element, - # nor a dictionary, then it's a simple value - self.items.append( - DisplayDataItem(element, namespace=self.namespace, key=key)) + # If it's not a HasDisplayData element, + # nor a dictionary, then it's a simple value + self.items.append( + DisplayDataItem(element, namespace=self.namespace, key=key)) def to_proto(self): # type: (...) -> List[beam_runner_api_pb2.DisplayData] """Returns a List of Beam proto representation of Display data.""" def create_payload(dd): - if isinstance(dd, beam_runner_api_pb2.DisplayData): - return dd - display_data_dict = None try: display_data_dict = dd.get_dict() diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py index e2ec8df9cfc..eb0695f337b 100644 --- a/sdks/python/apache_beam/yaml/main.py +++ b/sdks/python/apache_beam/yaml/main.py @@ -51,22 +51,19 @@ def _pipeline_spec_from_args(known_args): raise ValueError( "Exactly one of pipeline_spec or pipeline_spec_file must be set.") - return pipeline_yaml + return yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader) def run(argv=None): yaml_transform._LOGGER.setLevel('INFO') known_args, pipeline_args = _configure_parser(argv) - pipeline_yaml = _pipeline_spec_from_args(known_args) - pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader) + pipeline_spec = _pipeline_spec_from_args(known_args) - with beam.Pipeline( # linebreak for better yapf formatting - options=beam.options.pipeline_options.PipelineOptions( - pipeline_args, - pickle_library='cloudpickle', - **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( - 'options', {}))), - display_data={'yaml': pipeline_yaml}) as p: + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pipeline_args, + pickle_library='cloudpickle', + **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( + 'options', {})))) as p: print("Building pipeline...") yaml_transform.expand_pipeline(p, pipeline_spec) print("Running pipeline...")
