rohdesamuel commented on a change in pull request #15647:
URL: https://github.com/apache/beam/pull/15647#discussion_r722455985
##########
File path: sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
##########
@@ -181,11 +196,22 @@ def beam_sql(self, line: str, cell: Optional[str] = None)
-> Optional[PValue]:
if not query:
on_error('Please supply the SQL query to be executed.')
return
+ if runner and runner not in _SUPPORTED_RUNNERS:
+ on_error(
+ 'Runner "%s" is not supported. Supported runners are %s.',
+ runner,
+ _SUPPORTED_RUNNERS)
query = ' '.join(query)
- found = find_pcolls(query, pcoll_by_name(), verbose=verbose)
+ found = find_pcolls(
+ query,
+ pcoll_by_name(),
+ run=runner in ('DirectRunner', None),
+ verbose=verbose)
+ schemas = set()
+ main_session = importlib.import_module('__main__')
Review comment:
Gotcha
##########
File path: sdks/python/apache_beam/runners/interactive/sql/utils.py
##########
@@ -123,3 +134,299 @@ def pformat_namedtuple(schema: NamedTuple) -> str:
'{}: {}'.format(k, v.__name__) for k,
v in schema.__annotations__.items()
]))
+
+
+@dataclass
+class OptionsEntry:
+ """An entry of PipelineOptions that can be visualized through ipywidgets to
+ take inputs in IPython notebooks interactively.
+
+ Attributes:
+ label: The value of the Label widget.
+ help: The help message of the entry, usually the same to the help in
+ PipelineOptions.
+ cls: The PipelineOptions class/subclass the options belong to.
+ arg_builder: Builds the argument/option. If it's a str, this entry
+ assigns the input ipywidget's value directly to the argument. If it's a
+ Dict, use the corresponding Callable to assign the input value to each
+ argument. If Callable is None, fallback to assign the input value
+ directly. This allows building multiple similar PipelineOptions
+ arguments from a single input, such as staging_location and
+ temp_location in GoogleCloudOptions.
+ default: The default value of the entry, None if absent.
+ """
+ label: str
+ help: str
+ cls: Type[PipelineOptions]
+ arg_builder: Union[str, Dict[str, Optional[Callable]]]
+ default: Optional[str] = None
+
+ def __post_init__(self):
+ # The attribute holds an ipywidget, currently only supports Text.
+ # The str value can be accessed by self.input.value.
+ self.input = None
+
+
+class OptionsForm:
+ """A form visualized to take inputs from users in IPython Notebooks and
+ generate PipelineOptions to run pipelines.
+ """
+ def __init__(self):
+ self.options = PipelineOptions()
+ self.entries = []
+
+ def add(self, entry: OptionsEntry) -> 'OptionsForm':
+ """Adds an OptionsEntry to the form.
+ """
+ self.entries.append(entry)
+ return self
+
+ def to_options(self) -> PipelineOptions:
+ """Builds the PipelineOptions based on user inputs.
+
+ Can only be invoked after display_for_input.
+ """
+ for entry in self.entries:
+ assert entry.input, (
+ 'to_options invoked before display_for_input. '
+ 'Wrong usage.')
+ view = self.options.view_as(entry.cls)
+ if isinstance(entry.arg_builder, str):
+ setattr(view, entry.arg_builder, entry.input.value)
+ else:
+ for arg, builder in entry.arg_builder.items():
+ if builder:
+ setattr(view, arg, builder(entry.input.value))
+ else:
+ setattr(view, arg, entry.input.value)
+ self.additional_options()
+ return self.options
+
+ def additional_options(self):
+ """Alters the self.options with additional config."""
+ pass
Review comment:
Cool, thank you
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]