KevinGG commented on a change in pull request #15647:
URL: https://github.com/apache/beam/pull/15647#discussion_r721808387
##########
File path: sdks/python/apache_beam/runners/interactive/sql/utils.py
##########
@@ -123,3 +134,299 @@ def pformat_namedtuple(schema: NamedTuple) -> str:
'{}: {}'.format(k, v.__name__) for k,
v in schema.__annotations__.items()
]))
+
+
+@dataclass
+class OptionsEntry:
+ """An entry of PipelineOptions that can be visualized through ipywidgets to
+ take inputs in IPython notebooks interactively.
+
+ Attributes:
+ label: The value of the Label widget.
+ help: The help message of the entry, usually the same to the help in
+ PipelineOptions.
+ cls: The PipelineOptions class/subclass the options belong to.
+ arg_builder: Builds the argument/option. If it's a str, this entry
+ assigns the input ipywidget's value directly to the argument. If it's a
+ Dict, use the corresponding Callable to assign the input value to each
+ argument. If Callable is None, fallback to assign the input value
+ directly. This allows building multiple similar PipelineOptions
+ arguments from a single input, such as staging_location and
+ temp_location in GoogleCloudOptions.
+ default: The default value of the entry, None if absent.
+ """
+ label: str
+ help: str
+ cls: Type[PipelineOptions]
+ arg_builder: Union[str, Dict[str, Optional[Callable]]]
+ default: Optional[str] = None
+
+ def __post_init__(self):
+ # The attribute holds an ipywidget, currently only supports Text.
+ # The str value can be accessed by self.input.value.
+ self.input = None
+
+
+class OptionsForm:
+ """A form visualized to take inputs from users in IPython Notebooks and
+ generate PipelineOptions to run pipelines.
+ """
+ def __init__(self):
+ self.options = PipelineOptions()
+ self.entries = []
+
+ def add(self, entry: OptionsEntry) -> 'OptionsForm':
+ """Adds an OptionsEntry to the form.
+ """
+ self.entries.append(entry)
+ return self
+
+ def to_options(self) -> PipelineOptions:
+ """Builds the PipelineOptions based on user inputs.
+
+ Can only be invoked after display_for_input.
+ """
+ for entry in self.entries:
+ assert entry.input, (
+ 'to_options invoked before display_for_input. '
+ 'Wrong usage.')
+ view = self.options.view_as(entry.cls)
+ if isinstance(entry.arg_builder, str):
+ setattr(view, entry.arg_builder, entry.input.value)
+ else:
+ for arg, builder in entry.arg_builder.items():
+ if builder:
+ setattr(view, arg, builder(entry.input.value))
+ else:
+ setattr(view, arg, entry.input.value)
+ self.additional_options()
+ return self.options
+
+ def additional_options(self):
+ """Alters the self.options with additional config."""
+ pass
+
+ def display_for_input(self) -> 'OptionsForm':
+ """Displays the widgets to take user inputs."""
+ from IPython.display import display
+ from ipywidgets import GridBox
+ from ipywidgets import Label
+ from ipywidgets import Layout
+ from ipywidgets import Text
+ widgets = []
+ for entry in self.entries:
+ text_label = Label(value=entry.label)
+ text_input = entry.input if entry.input else Text(
+ value=entry.default if entry.default else '')
+ text_help = Label(value=entry.help)
+ entry.input = text_input
+ widgets.append(text_label)
+ widgets.append(text_input)
+ widgets.append(text_help)
+ grid = GridBox(widgets, layout=Layout(grid_template_columns='1fr 2fr 6fr'))
+ display(grid)
+ self.display_actions()
+ return self
+
+ def display_actions(self):
+ """Displays actionable widgets to utilize the options, run pipelines and
+ etc."""
+ pass
+
+
+class DataflowOptionsForm(OptionsForm):
+ """A form to take inputs from users in IPython Notebooks to build
+ PipelineOptions to run pipelines on Dataflow.
+
+ Only contains minimum fields needed.
+ """
+ @staticmethod
+ def _build_default_project() -> str:
+ """Builds a default project id."""
+ try:
+ # pylint: disable=c-extension-no-member
+ import google.auth
+ return google.auth.default()[1]
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except Exception as e:
+ _LOGGER.warning('There is some issue with your gcloud auth: %s', e)
+ return 'your-project-id'
+
+ @staticmethod
+ def _build_req_file_from_pkgs(pkgs) -> Optional[str]:
+ """Builds a requirements file that contains all additional PYPI packages
+ needed."""
+ if pkgs:
+ deps = pkgs.split()
+ req_file = os.path.join(
+ tempfile.mkdtemp(prefix='beam-sql-dataflow-'), 'req.txt')
+ with open(req_file, 'a') as f:
+ for dep in deps:
+ f.write(dep + '\n')
+ return req_file
+ return None
+
+ def __init__(
+ self,
+ output_name: str,
+ output_pcoll: beam.PCollection,
+ verbose: bool = False):
+ """Inits the OptionsForm for setting up Dataflow jobs."""
+ super().__init__()
+ self.p = output_pcoll.pipeline
+ self.output_name = output_name
+ self.output_pcoll = output_pcoll
+ self.verbose = verbose
+ self.notice_shown = False
+ self.add(
+ OptionsEntry(
+ label='Project Id',
+ help='Name of the Cloud project owning the Dataflow job.',
+ cls=GoogleCloudOptions,
+ arg_builder='project',
+ default=DataflowOptionsForm._build_default_project())
+ ).add(
+ OptionsEntry(
+ label='Region',
+ help='The Google Compute Engine region for creating Dataflow job.',
+ cls=GoogleCloudOptions,
+ arg_builder='region',
+ default='us-central1')
+ ).add(
+ OptionsEntry(
+ label='GCS Bucket',
+ help=(
+ 'GCS path to stage code packages needed by workers and save '
+ 'temporary workflow jobs.'),
+ cls=GoogleCloudOptions,
+ arg_builder={
+ 'staging_location': lambda x: x + '/staging',
+ 'temp_location': lambda x: x + '/temp'
+ },
+ default='gs://YOUR_GCS_BUCKET_HERE')
+ ).add(
+ OptionsEntry(
+ label='Additional Packages',
+ help=(
+ 'PYPI packages installed, whitespace separated. If None, leave
'
+ 'this field empty.'),
Review comment:
Yes, change it to comma-separated.
Since comma and whitespace are both legit delimiters for PYPI packages.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]