This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 06a206a  [BEAM-7760] Added Interactive Beam module
     new 6e9675b  Merge pull request #9278 from KevinGG/master
06a206a is described below

commit 06a206ac2c23cb5109eaac609296abdd43c5b200
Author: NING KANG <[email protected]>
AuthorDate: Thu Aug 22 13:59:14 2019 -0700

    [BEAM-7760] Added Interactive Beam module
    
    1. Added interactive_beam module that will serve sugar syntax and
    shorthand functions to apply interactivity and visualize
    PCollection data.
    2. This commit implemented the implicitly managed Interactive Beam
    environment to track definition of user pipelines. It exposed a watch()
    interface for users to explicitly instruct Interactive Beam the
    whereabout of their pipeline definition when it's not in __main__.
    3. This commit exposed a PCollection data exploration interface
    visualize(). Implementation is yet to be added.
    4. Added interactive_environment module for internal usage without
    backward-compatibility. It holds the cache manager and watchable
    metadata for current interactive environment/session/context. Interfaces
    are provided to interact with the environment and its components.
    5. Unit tests included.
---
 .../runners/interactive/interactive_beam.py        |  86 +++++++++++++++++
 .../runners/interactive/interactive_beam_test.py   |  71 ++++++++++++++
 .../runners/interactive/interactive_environment.py | 107 +++++++++++++++++++++
 .../interactive/interactive_environment_test.py    |  93 ++++++++++++++++++
 4 files changed, 357 insertions(+)

diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
new file mode 100644
index 0000000..a7a7584
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module of Interactive Beam features that can be used in notebook.
+
+The purpose of the module is to reduce the learning curve of Interactive Beam
+users, provide a single place for importing and add sugar syntax for all
+Interactive Beam components. It gives users capability to interact with 
existing
+environment/session/context for Interactive Beam and visualize PCollections as
+bounded dataset. In the meantime, it hides the interactivity implementation
+from users so that users can focus on developing Beam pipeline without worrying
+about how hidden states in the interactive session are managed.
+
+Note: If you want backward-compatibility, only invoke interfaces provided by
+this module in your notebook or application code.
+"""
+from __future__ import absolute_import
+
+from apache_beam.runners.interactive import interactive_environment as ie
+
+
+def watch(watchable):
+  """Monitors a watchable.
+
+  This allows Interactive Beam to implicitly pass on the information about the
+  location of your pipeline definition.
+
+  Current implementation mainly watches for PCollection variables defined in
+  user code. A watchable can be a dictionary of variable metadata such as
+  locals(), a str name of a module, a module object or an instance of a class.
+  The variable can come from any scope even local variables in a method of a
+  class defined in a module.
+
+    Below are all valid::
+
+      watch(__main__)  # if import __main__ is already invoked
+      watch('__main__')  # does not require invoking import __main__ beforehand
+      watch(self)  # inside a class
+      watch(SomeInstance())  # an instance of a class
+      watch(locals())  # inside a function, watching local variables within
+
+  If you write a Beam pipeline in the __main__ module directly, since the
+  __main__ module is always watched, you don't have to instruct Interactive
+  Beam. If your Beam pipeline is defined in some module other than __main__,
+  such as inside a class function or a unit test, you can watch() the scope.
+
+    For example::
+
+      class Foo(object)
+        def run_pipeline(self):
+          p = beam.Pipeline()
+          init_pcoll = p |  'Init Create' >> beam.Create(range(10))
+          watch(locals())
+          p.run()
+          return init_pcoll
+      init_pcoll = Foo().run_pipeline()
+
+    Interactive Beam caches init_pcoll for the first run.
+
+    Then you can use::
+
+      visualize(init_pcoll)
+
+    To visualize data from init_pcoll once the pipeline is executed.
+  """
+  ie.current_env().watch(watchable)
+
+
+def visualize(pcoll):
+  """Visualizes a PCollection."""
+  # TODO(BEAM-7926)
+  pass
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
new file mode 100644
index 0000000..7660b1a
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for apache_beam.runners.interactive.interactive_beam."""
+from __future__ import absolute_import
+
+import importlib
+import unittest
+
+from apache_beam.runners.interactive import interactive_beam as ib
+from apache_beam.runners.interactive import interactive_environment as ie
+
+# The module name is also a variable in module.
+_module_name = 'apache_beam.runners.interactive.interactive_beam_test'
+
+
+class InteractiveBeamTest(unittest.TestCase):
+
+  def setUp(self):
+    self._var_in_class_instance = 'a var in class instance, not directly used'
+    ie.new_env()
+
+  def test_watch_main_by_default(self):
+    test_env = ie.InteractiveEnvironment()
+    # Current Interactive Beam env fetched and the test env are 2 instances.
+    self.assertNotEqual(id(ie.current_env()), id(test_env))
+    self.assertEqual(ie.current_env().watching(), test_env.watching())
+
+  def test_watch_a_module_by_name(self):
+    test_env = ie.InteractiveEnvironment()
+    ib.watch(_module_name)
+    test_env.watch(_module_name)
+    self.assertEqual(ie.current_env().watching(), test_env.watching())
+
+  def test_watch_a_module_by_module_object(self):
+    test_env = ie.InteractiveEnvironment()
+    module = importlib.import_module(_module_name)
+    ib.watch(module)
+    test_env.watch(module)
+    self.assertEqual(ie.current_env().watching(), test_env.watching())
+
+  def test_watch_locals(self):
+    # test_env serves as local var too.
+    test_env = ie.InteractiveEnvironment()
+    ib.watch(locals())
+    test_env.watch(locals())
+    self.assertEqual(ie.current_env().watching(), test_env.watching())
+
+  def test_watch_class_instance(self):
+    test_env = ie.InteractiveEnvironment()
+    ib.watch(self)
+    test_env.watch(self)
+    self.assertEqual(ie.current_env().watching(), test_env.watching())
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_environment.py 
b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
new file mode 100644
index 0000000..3dee1e3
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module of the current Interactive Beam environment.
+
+For internal use only; no backwards-compatibility guarantees.
+Provides interfaces to interact with existing Interactive Beam environment.
+External Interactive Beam users please use interactive_beam module in
+application code or notebook.
+"""
+from __future__ import absolute_import
+
+import importlib
+
+_interactive_beam_env = None
+
+
+def current_env(cache_manager=None):
+  """Gets current Interactive Beam environment."""
+  global _interactive_beam_env
+  if not _interactive_beam_env:
+    _interactive_beam_env = InteractiveEnvironment(cache_manager)
+  return _interactive_beam_env
+
+
+def new_env(cache_manager=None):
+  """Creates a new Interactive Beam environment to replace current one."""
+  global _interactive_beam_env
+  _interactive_beam_env = None
+  return current_env(cache_manager)
+
+
+class InteractiveEnvironment(object):
+  """An interactive environment with cache and pipeline variable metadata.
+
+  Interactive Beam will use the watched variable information to determine if a
+  PCollection is assigned to a variable in user pipeline definition. When
+  executing the pipeline, interactivity is applied with implicit cache
+  mechanism for those PCollections if the pipeline is interactive. Users can
+  also visualize and introspect those PCollections in user code since they have
+  handles to the variables.
+  """
+
+  def __init__(self, cache_manager=None):
+    self._cache_manager = cache_manager
+    # Holds class instances, module object, string of module names.
+    self._watching_set = set()
+    # Holds variables list of (Dict[str, object]).
+    self._watching_dict_list = []
+    # Always watch __main__ module.
+    self.watch('__main__')
+
+  def watch(self, watchable):
+    """Watches a watchable.
+
+    A watchable can be a dictionary of variable metadata such as locals(), a 
str
+    name of a module, a module object or an instance of a class. The variable
+    can come from any scope even local. Duplicated variable naming doesn't
+    matter since they are different instances. Duplicated variables are also
+    allowed when watching.
+    """
+    if isinstance(watchable, dict):
+      self._watching_dict_list.append(watchable.items())
+    else:
+      self._watching_set.add(watchable)
+
+  def watching(self):
+    """Analyzes and returns a list of pair lists referring to variable names 
and
+    values from watched scopes.
+
+    Each entry in the list represents the variable defined within a watched
+    watchable. Currently, each entry holds a list of pairs. The format might
+    change in the future to hold more metadata. Duplicated pairs are allowed.
+    And multiple paris can have the same variable name as the "first" while
+    having different variable values as the "second" since variables in
+    different scopes can have the same name.
+    """
+    watching = list(self._watching_dict_list)
+    for watchable in self._watching_set:
+      if isinstance(watchable, str):
+        module = importlib.import_module(watchable)
+        watching.append(vars(module).items())
+      else:
+        watching.append(vars(watchable).items())
+    return watching
+
+  def set_cache_manager(self, cache_manager):
+    """Sets the cache manager held by current Interactive Environment."""
+    self._cache_manager = cache_manager
+
+  def cache_manager(self):
+    """Gets the cache manager held by current Interactive Environment."""
+    return self._cache_manager
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
new file mode 100644
index 0000000..95bb163
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for apache_beam.runners.interactive.interactive_environment."""
+from __future__ import absolute_import
+
+import importlib
+import unittest
+
+from apache_beam.runners.interactive import interactive_environment as ie
+
+# The module name is also a variable in module.
+_module_name = 'apache_beam.runners.interactive.interactive_environment_test'
+
+
+class InteractiveEnvironmentTest(unittest.TestCase):
+
+  def setUp(self):
+    self._var_in_class_instance = 'a var in class instance'
+    ie.new_env()
+
+  def assertVariableWatched(self, variable_name, variable_val):
+    self.assertTrue(self._is_variable_watched(variable_name, variable_val))
+
+  def assertVariableNotWatched(self, variable_name, variable_val):
+    self.assertFalse(self._is_variable_watched(variable_name, variable_val))
+
+  def _is_variable_watched(self, variable_name, variable_val):
+    return any([(variable_name, variable_val) in watching for watching in
+                ie.current_env().watching()])
+
+  def _a_function_with_local_watched(self):
+    local_var_watched = 123  # pylint: disable=unused-variable
+    ie.current_env().watch(locals())
+
+  def _a_function_not_watching_local(self):
+    local_var_not_watched = 456  # pylint: disable=unused-variable
+
+  def test_watch_main_by_default(self):
+    self.assertTrue('__main__' in ie.current_env()._watching_set)
+    # __main__ module has variable __name__ with value '__main__'
+    self.assertVariableWatched('__name__', '__main__')
+
+  def test_watch_a_module_by_name(self):
+    self.assertFalse(
+        _module_name in ie.current_env()._watching_set)
+    self.assertVariableNotWatched('_module_name', _module_name)
+    ie.current_env().watch(_module_name)
+    self.assertTrue(
+        _module_name in
+        ie.current_env()._watching_set)
+    self.assertVariableWatched('_module_name', _module_name)
+
+  def test_watch_a_module_by_module_object(self):
+    module = importlib.import_module(_module_name)
+    self.assertFalse(module in ie.current_env()._watching_set)
+    self.assertVariableNotWatched('_module_name', _module_name)
+    ie.current_env().watch(module)
+    self.assertTrue(module in ie.current_env()._watching_set)
+    self.assertVariableWatched('_module_name', _module_name)
+
+  def test_watch_locals(self):
+    self.assertVariableNotWatched('local_var_watched', 123)
+    self.assertVariableNotWatched('local_var_not_watched', 456)
+    self._a_function_with_local_watched()
+    self.assertVariableWatched('local_var_watched', 123)
+    self._a_function_not_watching_local()
+    self.assertVariableNotWatched('local_var_not_watched', 456)
+
+  def test_watch_class_instance(self):
+    self.assertVariableNotWatched('_var_in_class_instance',
+                                  self._var_in_class_instance)
+    ie.current_env().watch(self)
+    self.assertVariableWatched('_var_in_class_instance',
+                               self._var_in_class_instance)
+
+
+if __name__ == '__main__':
+  unittest.main()

Reply via email to