[ 
https://issues.apache.org/jira/browse/BEAM-7926?focusedWorklogId=332718&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-332718
 ]

ASF GitHub Bot logged work on BEAM-7926:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Oct/19 17:46
            Start Date: 23/Oct/19 17:46
    Worklog Time Spent: 10m 
      Work Description: aaltay commented on pull request #9741: [BEAM-7926] 
Visualize PCollection
URL: https://github.com/apache/beam/pull/9741#discussion_r338185724
 
 

 ##########
 File path: 
sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
 ##########
 @@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for apache_beam.runners.interactive.display.pcoll_visualization."""
+from __future__ import absolute_import
+
+import sys
+import time
+import unittest
+
+import apache_beam as beam  # pylint: disable=ungrouped-imports
+import timeloop
+from apache_beam.runners import runner
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.display import pcoll_visualization as pv
+
+# Work around nose tests using Python2 without unittest.mock module.
+try:
+  from unittest.mock import patch
+except ImportError:
+  from mock import patch
+
+
+class PCollVisualizationTest(unittest.TestCase):
+
+  def setUp(self):
+    self._p = beam.Pipeline()
+    # pylint: disable=range-builtin-not-iterating
+    self._pcoll = self._p | 'Create' >> beam.Create(range(1000))
+
+  @unittest.skipIf(sys.version_info < (3, 5, 3),
+                   'PCollVisualization is not supported on Python 2.')
+  def test_raise_error_for_non_pcoll_input(self):
+    class Foo(object):
+      pass
+
+    with self.assertRaises(AssertionError) as ctx:
+      pv.PCollVisualization(Foo())
+      self.assertTrue('pcoll should be apache_beam.pvalue.PCollection' in
+                      ctx.exception)
+
+  @unittest.skipIf(sys.version_info < (3, 5, 3),
+                   'PCollVisualization is not supported on Python 2.')
+  def test_pcoll_visualization_generate_unique_display_id(self):
+    pv_1 = pv.PCollVisualization(self._pcoll)
+    pv_2 = pv.PCollVisualization(self._pcoll)
+    self.assertNotEqual(pv_1._dive_display_id, pv_2._dive_display_id)
+    self.assertNotEqual(pv_1._overview_display_id, pv_2._overview_display_id)
+    self.assertNotEqual(pv_1._df_display_id, pv_2._df_display_id)
+
+  @unittest.skipIf(sys.version_info < (3, 5, 3),
+                   'PCollVisualization is not supported on Python 2.')
+  @patch('apache_beam.runners.interactive.display.pcoll_visualization'
+         '.PCollVisualization._to_element_list', lambda x: [1, 2, 3])
+  def test_one_shot_visualization_not_return_handle(self):
+    self.assertIsNone(pv.visualize(self._pcoll))
+
+  def _mock_to_element_list(self):
+    yield [1, 2, 3]
+    yield [1, 2, 3, 4]
+    yield [1, 2, 3, 4, 5]
+    yield [1, 2, 3, 4, 5, 6]
+    yield [1, 2, 3, 4, 5, 6, 7]
+    yield [1, 2, 3, 4, 5, 6, 7, 8]
+
+  @unittest.skipIf(sys.version_info < (3, 5, 3),
+                   'PCollVisualization is not supported on Python 2.')
+  @patch('apache_beam.runners.interactive.display.pcoll_visualization'
+         '.PCollVisualization._to_element_list', _mock_to_element_list)
+  def test_dynamic_plotting_return_handle(self):
+    h = pv.visualize(self._pcoll, dynamic_plotting_interval=1)
+    self.assertIsInstance(h, timeloop.Timeloop)
+    h.stop()
+
+  @unittest.skipIf(sys.version_info < (3, 5, 3),
+                   'PCollVisualization is not supported on Python 2.')
+  @patch('apache_beam.runners.interactive.display.pcoll_visualization'
+         '.PCollVisualization._to_element_list', _mock_to_element_list)
+  @patch('apache_beam.runners.interactive.display.pcoll_visualization'
+         '.PCollVisualization.display_facets')
+  def test_dynamic_plotting_update_same_display(self,
+                                                mocked_display_facets):
+    fake_pipeline_result = runner.PipelineResult(runner.PipelineState.RUNNING)
+    ie.current_env().set_pipeline_result(self._p, fake_pipeline_result)
+    # Starts async dynamic plotting that never ends in this test.
+    h = pv.visualize(self._pcoll, dynamic_plotting_interval=0.001)
+    # Blocking so the above async task can execute at least 3 iterations.
+    timeout = time.time() + 0.1
+    while len(mocked_display_facets.call_args_list) < 3:
+      if time.time() > timeout:
+        break
+    # The first iteration doesn't provide updating_pv to display_facets.
+    _, first_kwargs = mocked_display_facets.call_args_list[0]
+    self.assertEqual(first_kwargs, {})
+    # The following iterations use the same updating_pv to display_facets and 
so
+    # on.
+    _, second_kwargs = mocked_display_facets.call_args_list[1]
+    updating_pv = second_kwargs['updating_pv']
+    for call in mocked_display_facets.call_args_list[2:]:
+      _, kwargs = call
+      self.assertIs(kwargs['updating_pv'], updating_pv)
+    h.stop()
+
+  @unittest.skipIf(sys.version_info < (3, 6),
 
 Review comment:
   It would be useful to add your response as code comments here.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 332718)

> Visualize PCollection with Interactive Beam
> -------------------------------------------
>
>                 Key: BEAM-7926
>                 URL: https://issues.apache.org/jira/browse/BEAM-7926
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-py-interactive
>            Reporter: Ning Kang
>            Assignee: Ning Kang
>            Priority: Major
>          Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
> p = create_pipeline()
> pcoll = p | 'Transform' >> transform()
> The use can call a single function and get auto-magical charting of the data 
> as materialized pcoll.
> e.g., visualize(pcoll)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to