KevinGG commented on a change in pull request #15647:
URL: https://github.com/apache/beam/pull/15647#discussion_r721834235



##########
File path: sdks/python/apache_beam/runners/interactive/sql/utils.py
##########
@@ -123,3 +134,299 @@ def pformat_namedtuple(schema: NamedTuple) -> str:
           '{}: {}'.format(k, v.__name__) for k,
           v in schema.__annotations__.items()
       ]))
+
+
+@dataclass
+class OptionsEntry:
+  """An entry of PipelineOptions that can be visualized through ipywidgets to
+  take inputs in IPython notebooks interactively.
+
+  Attributes:
+    label: The value of the Label widget.
+    help: The help message of the entry, usually the same to the help in
+      PipelineOptions.
+    cls: The PipelineOptions class/subclass the options belong to.
+    arg_builder: Builds the argument/option. If it's a str, this entry
+      assigns the input ipywidget's value directly to the argument. If it's a
+      Dict, use the corresponding Callable to assign the input value to each
+      argument. If Callable is None, fallback to assign the input value
+      directly. This allows building multiple similar PipelineOptions
+      arguments from a single input, such as staging_location and
+      temp_location in GoogleCloudOptions.
+    default: The default value of the entry, None if absent.
+  """
+  label: str
+  help: str
+  cls: Type[PipelineOptions]
+  arg_builder: Union[str, Dict[str, Optional[Callable]]]
+  default: Optional[str] = None
+
+  def __post_init__(self):
+    # The attribute holds an ipywidget, currently only supports Text.
+    # The str value can be accessed by self.input.value.
+    self.input = None
+
+
+class OptionsForm:
+  """A form visualized to take inputs from users in IPython Notebooks and
+  generate PipelineOptions to run pipelines.
+  """
+  def __init__(self):
+    self.options = PipelineOptions()
+    self.entries = []
+
+  def add(self, entry: OptionsEntry) -> 'OptionsForm':
+    """Adds an OptionsEntry to the form.
+    """
+    self.entries.append(entry)
+    return self
+
+  def to_options(self) -> PipelineOptions:
+    """Builds the PipelineOptions based on user inputs.
+
+    Can only be invoked after display_for_input.
+    """
+    for entry in self.entries:
+      assert entry.input, (
+          'to_options invoked before display_for_input. '
+          'Wrong usage.')
+      view = self.options.view_as(entry.cls)
+      if isinstance(entry.arg_builder, str):
+        setattr(view, entry.arg_builder, entry.input.value)
+      else:
+        for arg, builder in entry.arg_builder.items():
+          if builder:
+            setattr(view, arg, builder(entry.input.value))
+          else:
+            setattr(view, arg, entry.input.value)
+    self.additional_options()
+    return self.options
+
+  def additional_options(self):
+    """Alters the self.options with additional config."""
+    pass
+
+  def display_for_input(self) -> 'OptionsForm':
+    """Displays the widgets to take user inputs."""
+    from IPython.display import display
+    from ipywidgets import GridBox
+    from ipywidgets import Label
+    from ipywidgets import Layout
+    from ipywidgets import Text
+    widgets = []
+    for entry in self.entries:
+      text_label = Label(value=entry.label)
+      text_input = entry.input if entry.input else Text(
+          value=entry.default if entry.default else '')
+      text_help = Label(value=entry.help)
+      entry.input = text_input
+      widgets.append(text_label)
+      widgets.append(text_input)
+      widgets.append(text_help)
+    grid = GridBox(widgets, layout=Layout(grid_template_columns='1fr 2fr 6fr'))
+    display(grid)
+    self.display_actions()
+    return self
+
+  def display_actions(self):
+    """Displays actionable widgets to utilize the options, run pipelines and
+    etc."""
+    pass
+
+
+class DataflowOptionsForm(OptionsForm):
+  """A form to take inputs from users in IPython Notebooks to build
+  PipelineOptions to run pipelines on Dataflow.
+
+  Only contains minimum fields needed.
+  """
+  @staticmethod
+  def _build_default_project() -> str:
+    """Builds a default project id."""
+    try:
+      # pylint: disable=c-extension-no-member
+      import google.auth
+      return google.auth.default()[1]
+    except (KeyboardInterrupt, SystemExit):
+      raise
+    except Exception as e:
+      _LOGGER.warning('There is some issue with your gcloud auth: %s', e)
+      return 'your-project-id'
+
+  @staticmethod
+  def _build_req_file_from_pkgs(pkgs) -> Optional[str]:
+    """Builds a requirements file that contains all additional PYPI packages
+    needed."""
+    if pkgs:
+      deps = pkgs.split()
+      req_file = os.path.join(
+          tempfile.mkdtemp(prefix='beam-sql-dataflow-'), 'req.txt')
+      with open(req_file, 'a') as f:
+        for dep in deps:
+          f.write(dep + '\n')
+      return req_file
+    return None
+
+  def __init__(
+      self,
+      output_name: str,
+      output_pcoll: beam.PCollection,
+      verbose: bool = False):
+    """Inits the OptionsForm for setting up Dataflow jobs."""
+    super().__init__()
+    self.p = output_pcoll.pipeline
+    self.output_name = output_name
+    self.output_pcoll = output_pcoll
+    self.verbose = verbose
+    self.notice_shown = False
+    self.add(
+        OptionsEntry(
+            label='Project Id',
+            help='Name of the Cloud project owning the Dataflow job.',
+            cls=GoogleCloudOptions,
+            arg_builder='project',
+            default=DataflowOptionsForm._build_default_project())
+    ).add(
+        OptionsEntry(
+            label='Region',
+            help='The Google Compute Engine region for creating Dataflow job.',
+            cls=GoogleCloudOptions,
+            arg_builder='region',
+            default='us-central1')
+    ).add(
+        OptionsEntry(
+            label='GCS Bucket',
+            help=(
+                'GCS path to stage code packages needed by workers and save '
+                'temporary workflow jobs.'),
+            cls=GoogleCloudOptions,
+            arg_builder={
+                'staging_location': lambda x: x + '/staging',
+                'temp_location': lambda x: x + '/temp'
+            },
+            default='gs://YOUR_GCS_BUCKET_HERE')
+    ).add(
+        OptionsEntry(
+            label='Additional Packages',
+            help=(
+                'PYPI packages installed, whitespace separated. If None, leave 
'
+                'this field empty.'),
+            cls=SetupOptions,
+            arg_builder={
+                'requirements_file': lambda x: DataflowOptionsForm.
+                _build_req_file_from_pkgs(x)
+            },
+            default=''))
+
+  def additional_options(self):
+    # Use the latest Java SDK by default.
+    sdk_overrides = self.options.view_as(
+        WorkerOptions).sdk_harness_container_image_overrides
+    override = '.*java.*,apache/beam_java11_sdk:latest'
+    if sdk_overrides and override not in sdk_overrides:
+      sdk_overrides.append(override)
+    else:
+      self.options.view_as(
+          WorkerOptions).sdk_harness_container_image_overrides = [override]
+
+  def display_actions(self):
+    from IPython.display import HTML
+    from IPython.display import display
+    from ipywidgets import Button
+    from ipywidgets import GridBox
+    from ipywidgets import Layout
+    from ipywidgets import Output
+    output_area = Output()
+    run_btn = Button(
+        description='Run on Dataflow',
+        button_style='success',
+        tooltip=(
+            'Submit to Dataflow for execution with the configured options. The 
'
+            'output PCollection\'s data will be written to the GCS bucket you '
+            'configure.'))
+    show_options_btn = Button(
+        description='Show Options',
+        button_style='info',
+        tooltip='Show current pipeline options configured.')
+
+    def _run_on_dataflow(btn):
+      with output_area:
+
+        @progress_indicated
+        def _inner():
+          options = self.to_options()
+          # Caches the output_pcoll to a GCS bucket.
+          try:
+            output_location = '{}/{}'.format(
+                options.view_as(GoogleCloudOptions).staging_location,
+                self.output_name)
+            _ = self.output_pcoll | 'WriteOuput{}ToGCS'.format(
+                self.output_name) >> WriteToText(output_location)
+            _LOGGER.info(
+                'Data of output PCollection %s will be written to %s',
+                self.output_name,
+                output_location)
+          except (KeyboardInterrupt, SystemExit):
+            raise
+          except:  # pylint: disable=bare-except
+            # The transform has been added before, noop.
+            pass
+          if self.verbose:
+            _LOGGER.info(
+                'Running the pipeline on Dataflow with pipeline options %s.',
+                options.display_data())

Review comment:
       Also separated the output areas of both buttons and always clear the 
output areas before displaying new contents to avoid displaying outdated 
information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to