This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 61f0b34  [BEAM-12246] Fix ib.collect(dataframe) indexing (#14778)
61f0b34 is described below

commit 61f0b3449f57cafb19377ba89ad2fac101f514a9
Author: Sam sam <[email protected]>
AuthorDate: Fri May 21 15:23:27 2021 -0700

    [BEAM-12246] Fix ib.collect(dataframe) indexing (#14778)
    
    * [BEAM-12246] Fix ib.collect(dataframe) indexing
    
    * undo commenting out progress_indicated
    
    * remove reset_unnamed_index
    
    * update type for elements_to_df
    
    * Add todos
---
 .../runners/interactive/interactive_beam.py        |  21 +++-
 .../runners/interactive/interactive_runner_test.py | 119 ++++++++++++++++++++-
 .../interactive/options/capture_limiters.py        |  10 +-
 .../interactive/options/capture_limiters_test.py   |  16 +++
 .../runners/interactive/recording_manager.py       |  10 +-
 .../apache_beam/runners/interactive/utils.py       |  10 +-
 .../apache_beam/runners/interactive/utils_test.py  |  61 ++++++++---
 7 files changed, 219 insertions(+), 28 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index fd42053..8db1037 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -527,6 +527,8 @@ def collect(pcoll, n='inf', duration='inf', 
include_window_info=False):
     n: (optional) max number of elements to visualize. Default 'inf'.
     duration: (optional) max duration of elements to read in integer seconds or
         a string duration. Default 'inf'.
+    include_window_info: (optional) if True, appends the windowing information
+        to each row. Default False.
 
   For example::
 
@@ -537,8 +539,18 @@ def collect(pcoll, n='inf', duration='inf', 
include_window_info=False):
     # Run the pipeline and bring the PCollection into memory as a Dataframe.
     in_memory_square = head(square, n=5)
   """
+  # Remember the element type so we can make an informed decision on how to
+  # collect the result in elements_to_df.
   if isinstance(pcoll, DeferredBase):
-    pcoll = to_pcollection(pcoll)
+    # Get the proxy so we can get the output shape of the DataFrame.
+    # TODO(BEAM-11064): Once type hints are implemented for pandas, use those
+    # instead of the proxy.
+    element_type = pcoll._expr.proxy()
+    pcoll = to_pcollection(
+        pcoll, yield_elements='pandas', label=str(pcoll._expr))
+    watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
+  else:
+    element_type = pcoll.element_type
 
   assert isinstance(pcoll, beam.pvalue.PCollection), (
       '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
@@ -572,10 +584,15 @@ def collect(pcoll, n='inf', duration='inf', 
include_window_info=False):
     recording.cancel()
     return pd.DataFrame()
 
+  if n == float('inf'):
+    n = None
+
+  # Collecting DataFrames may have a length > n, so slice again to be sure. 
Note
+  # that array[:None] returns everything.
   return elements_to_df(
       elements,
       include_window_info=include_window_info,
-      element_type=pcoll.element_type)
+      element_type=element_type)[:n]
 
 
 @progress_indicated
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 27f1350..fe6989e 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -24,13 +24,13 @@ This module is experimental. No backwards-compatibility 
guarantees.
 
 import sys
 import unittest
+from typing import NamedTuple
 from unittest.mock import patch
 
 import pandas as pd
 
 import apache_beam as beam
 from apache_beam.dataframe.convert import to_dataframe
-from apache_beam.dataframe.convert import to_pcollection
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_beam as ib
@@ -54,6 +54,12 @@ def print_with_message(msg):
   return printer
 
 
+class Record(NamedTuple):
+  name: str
+  age: int
+  height: int
+
+
 class InteractiveRunnerTest(unittest.TestCase):
   @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
   def test_basic(self):
@@ -285,7 +291,6 @@ class InteractiveRunnerTest(unittest.TestCase):
     data = p | beam.Create(
         [1, 2, 3]) | beam.Map(lambda x: beam.Row(square=x * x, cube=x * x * x))
     df = to_dataframe(data)
-    pcoll = to_pcollection(df)
 
     # Watch the local scope for Interactive Beam so that values will be cached.
     ib.watch(locals())
@@ -295,9 +300,113 @@ class InteractiveRunnerTest(unittest.TestCase):
     ie.current_env().track_user_pipelines()
 
     df_expected = pd.DataFrame({'square': [1, 4, 9], 'cube': [1, 8, 27]})
-    pd.testing.assert_frame_equal(df_expected, ib.collect(data, n=10))
-    pd.testing.assert_frame_equal(df_expected, ib.collect(df, n=10))
-    pd.testing.assert_frame_equal(df_expected, ib.collect(pcoll, n=10))
+    pd.testing.assert_frame_equal(
+        df_expected, ib.collect(df, n=10).reset_index(drop=True))
+
+  @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+  def test_dataframes_with_grouped_index(self):
+    p = beam.Pipeline(
+        runner=interactive_runner.InteractiveRunner(
+            direct_runner.DirectRunner()))
+
+    data = [
+        Record('a', 20, 170),
+        Record('a', 30, 170),
+        Record('b', 22, 180),
+        Record('c', 18, 150)
+    ]
+
+    aggregate = lambda df: df.groupby('height').mean()
+
+    deferred_df = aggregate(to_dataframe(p | beam.Create(data)))
+    df_expected = aggregate(pd.DataFrame(data))
+
+    # Watch the local scope for Interactive Beam so that values will be cached.
+    ib.watch(locals())
+
+    # This is normally done in the interactive_utils when a transform is
+    # applied but needs an IPython environment. So we manually run this here.
+    ie.current_env().track_user_pipelines()
+
+    pd.testing.assert_frame_equal(df_expected, ib.collect(deferred_df, n=10))
+
+  @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+  def test_dataframes_with_multi_index(self):
+    p = beam.Pipeline(
+        runner=interactive_runner.InteractiveRunner(
+            direct_runner.DirectRunner()))
+
+    data = [
+        Record('a', 20, 170),
+        Record('a', 30, 170),
+        Record('b', 22, 180),
+        Record('c', 18, 150)
+    ]
+
+    aggregate = lambda df: df.groupby(['name', 'height']).mean()
+
+    deferred_df = aggregate(to_dataframe(p | beam.Create(data)))
+    df_expected = aggregate(pd.DataFrame(data))
+
+    # Watch the local scope for Interactive Beam so that values will be cached.
+    ib.watch(locals())
+
+    # This is normally done in the interactive_utils when a transform is
+    # applied but needs an IPython environment. So we manually run this here.
+    ie.current_env().track_user_pipelines()
+
+    pd.testing.assert_frame_equal(df_expected, ib.collect(deferred_df, n=10))
+
+  @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+  def test_dataframes_with_multi_index_get_result(self):
+    p = beam.Pipeline(
+        runner=interactive_runner.InteractiveRunner(
+            direct_runner.DirectRunner()))
+
+    data = [
+        Record('a', 20, 170),
+        Record('a', 30, 170),
+        Record('b', 22, 180),
+        Record('c', 18, 150)
+    ]
+
+    aggregate = lambda df: df.groupby(['name', 'height']).mean()['age']
+
+    deferred_df = aggregate(to_dataframe(p | beam.Create(data)))
+    df_expected = aggregate(pd.DataFrame(data))
+
+    # Watch the local scope for Interactive Beam so that values will be cached.
+    ib.watch(locals())
+
+    # This is normally done in the interactive_utils when a transform is
+    # applied but needs an IPython environment. So we manually run this here.
+    ie.current_env().track_user_pipelines()
+
+    pd.testing.assert_series_equal(df_expected, ib.collect(deferred_df, n=10))
+
+  @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+  def test_dataframes_same_cell_twice(self):
+    p = beam.Pipeline(
+        runner=interactive_runner.InteractiveRunner(
+            direct_runner.DirectRunner()))
+    data = p | beam.Create(
+        [1, 2, 3]) | beam.Map(lambda x: beam.Row(square=x * x, cube=x * x * x))
+    df = to_dataframe(data)
+
+    # Watch the local scope for Interactive Beam so that values will be cached.
+    ib.watch(locals())
+
+    # This is normally done in the interactive_utils when a transform is
+    # applied but needs an IPython environment. So we manually run this here.
+    ie.current_env().track_user_pipelines()
+
+    df_expected = pd.DataFrame({'square': [1, 4, 9], 'cube': [1, 8, 27]})
+    pd.testing.assert_series_equal(
+        df_expected['square'],
+        ib.collect(df['square'], n=10).reset_index(drop=True))
+    pd.testing.assert_series_equal(
+        df_expected['cube'],
+        ib.collect(df['cube'], n=10).reset_index(drop=True))
 
 
 if __name__ == '__main__':
diff --git 
a/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py 
b/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
index 0cc7022..ed921cc 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
@@ -22,10 +22,13 @@ For internal use only; no backwards-compatibility 
guarantees.
 
 import threading
 
+import pandas as pd
+
 from apache_beam.portability.api.beam_interactive_api_pb2 import 
TestStreamFileHeader
 from apache_beam.portability.api.beam_interactive_api_pb2 import 
TestStreamFileRecord
 from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
 from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.utils.windowed_value import WindowedValue
 
 
 class Limiter:
@@ -105,7 +108,12 @@ class CountLimiter(ElementLimiter):
     # Otherwise, count everything else but the header of the file since it is
     # not an element.
     elif not isinstance(e, TestStreamFileHeader):
-      self._count += 1
+      # When elements are DataFrames, we want the output to be constrained by
+      # how many rows we have read, not how many DataFrames we have read.
+      if isinstance(e, WindowedValue) and isinstance(e.value, pd.DataFrame):
+        self._count += len(e.value)
+      else:
+        self._count += 1
 
   def is_triggered(self):
     return self._count >= self._max_count
diff --git 
a/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py 
b/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
index 2010720..2af1995 100644
--- 
a/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
@@ -17,9 +17,12 @@
 
 import unittest
 
+import pandas as pd
+
 from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
 from apache_beam.runners.interactive.options.capture_limiters import 
CountLimiter
 from apache_beam.runners.interactive.options.capture_limiters import 
ProcessingTimeLimiter
+from apache_beam.utils.windowed_value import WindowedValue
 
 
 class CaptureLimitersTest(unittest.TestCase):
@@ -33,6 +36,19 @@ class CaptureLimitersTest(unittest.TestCase):
     limiter.update(4)
     self.assertTrue(limiter.is_triggered())
 
+  def test_count_limiter_with_dataframes(self):
+    limiter = CountLimiter(5)
+
+    # Test that empty dataframes don't count.
+    for _ in range(10):
+      df = WindowedValue(pd.DataFrame(), 0, [])
+      limiter.update(df)
+
+    self.assertFalse(limiter.is_triggered())
+    df = WindowedValue(pd.DataFrame({'col': list(range(10))}), 0, [])
+    limiter.update(df)
+    self.assertTrue(limiter.is_triggered())
+
   def test_processing_time_limiter(self):
     limiter = ProcessingTimeLimiter(max_duration_secs=2)
 
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager.py 
b/sdks/python/apache_beam/runners/interactive/recording_manager.py
index 49568b1..f3baefa 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager.py
@@ -296,8 +296,14 @@ class RecordingManager:
           watched_pcollections.add(val)
         elif isinstance(val, DeferredBase):
           watched_dataframes.add(val)
-    # Convert them all in a single step for efficiency.
-    for pcoll in to_pcollection(*watched_dataframes, always_return_tuple=True):
+
+    # Convert them one-by-one to generate a unique label for each. This allows
+    # caching at a more fine-grained granularity.
+    #
+    # TODO(BEAM-12388): investigate the mixing pcollections in multiple
+    # pipelines error when using the default label.
+    for df in watched_dataframes:
+      pcoll = to_pcollection(df, yield_elements='pandas', label=str(df._expr))
       watched_pcollections.add(pcoll)
     for pcoll in pcolls:
       if pcoll not in watched_pcollections:
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py 
b/sdks/python/apache_beam/runners/interactive/utils.py
index c415fd6..878ec23 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -101,8 +101,14 @@ def elements_to_df(elements, include_window_info=False, 
element_type=None):
     if include_window_info:
       windowed_info.append([e.timestamp.micros, e.windows, e.pane_info])
 
-  rows_df = pd.DataFrame(rows, columns=columns_names)
-  if include_window_info:
+  using_dataframes = isinstance(element_type, pd.DataFrame)
+  using_series = isinstance(element_type, pd.Series)
+  if using_dataframes or using_series:
+    rows_df = pd.concat(rows)
+  else:
+    rows_df = pd.DataFrame(rows, columns=columns_names)
+
+  if include_window_info and not using_series:
     windowed_info_df = pd.DataFrame(
         windowed_info, columns=['event_time', 'windows', 'pane_info'])
     final_df = pd.concat([rows_df, windowed_info_df], axis=1)
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py 
b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 7af9eb1..ecbba30 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -18,6 +18,7 @@
 import json
 import logging
 import unittest
+from typing import NamedTuple
 from unittest.mock import PropertyMock
 from unittest.mock import patch
 
@@ -25,7 +26,9 @@ import numpy as np
 import pandas as pd
 import pytest
 
+import apache_beam as beam
 from apache_beam import coders
+from apache_beam.dataframe.convert import to_dataframe
 from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import utils
@@ -35,16 +38,23 @@ from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
 
 
+class Record(NamedTuple):
+  order_id: int
+  product_id: int
+  quantity: int
+
+
+def windowed_value(e):
+  from apache_beam.transforms.window import GlobalWindow
+  return WindowedValue(e, 1, [GlobalWindow()])
+
+
 class ParseToDataframeTest(unittest.TestCase):
   def test_parse_windowedvalue(self):
     """Tests that WindowedValues are supported but not present.
     """
-    from apache_beam.transforms.window import GlobalWindow
 
-    els = [
-        WindowedValue(('a', 2), 1, [GlobalWindow()]),
-        WindowedValue(('b', 3), 1, [GlobalWindow()])
-    ]
+    els = [windowed_value(('a', 2)), windowed_value(('b', 3))]
 
     actual_df = utils.elements_to_df(els, include_window_info=False)
     expected_df = pd.DataFrame([['a', 2], ['b', 3]], columns=[0, 1])
@@ -54,12 +64,8 @@ class ParseToDataframeTest(unittest.TestCase):
   def test_parse_windowedvalue_with_window_info(self):
     """Tests that WindowedValues are supported and have their own columns.
     """
-    from apache_beam.transforms.window import GlobalWindow
 
-    els = [
-        WindowedValue(('a', 2), 1, [GlobalWindow()]),
-        WindowedValue(('b', 3), 1, [GlobalWindow()])
-    ]
+    els = [windowed_value(('a', 2)), windowed_value(('b', 3))]
 
     actual_df = utils.elements_to_df(els, include_window_info=True)
     expected_df = pd.DataFrame(
@@ -72,15 +78,13 @@ class ParseToDataframeTest(unittest.TestCase):
   def test_parse_windowedvalue_with_dicts(self):
     """Tests that dicts play well with WindowedValues.
     """
-    from apache_beam.transforms.window import GlobalWindow
-
     els = [
-        WindowedValue({
+        windowed_value({
             'b': 2, 'd': 4
-        }, 1, [GlobalWindow()]),
-        WindowedValue({
+        }),
+        windowed_value({
             'a': 1, 'b': 2, 'c': 3
-        }, 1, [GlobalWindow()])
+        })
     ]
 
     actual_df = utils.elements_to_df(els, include_window_info=True)
@@ -91,6 +95,31 @@ class ParseToDataframeTest(unittest.TestCase):
     # check_like so that ordering of indices doesn't matter.
     pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
 
+  def test_parse_dataframes(self):
+    """Tests that it correctly parses a DataFrame.
+    """
+    deferred = to_dataframe(beam.Pipeline() | beam.Create([Record(0, 0, 0)]))
+
+    els = [windowed_value(pd.DataFrame(Record(n, 0, 0))) for n in range(10)]
+
+    actual_df = utils.elements_to_df(
+        els, element_type=deferred._expr.proxy()).reset_index(drop=True)
+    expected_df = pd.concat([e.value for e in els], ignore_index=True)
+    pd.testing.assert_frame_equal(actual_df, expected_df)
+
+  def test_parse_series(self):
+    """Tests that it correctly parses a Pandas Series.
+    """
+    deferred = to_dataframe(beam.Pipeline()
+                            | beam.Create([Record(0, 0, 0)]))['order_id']
+
+    els = [windowed_value(pd.Series([n])) for n in range(10)]
+
+    actual_df = utils.elements_to_df(
+        els, element_type=deferred._expr.proxy()).reset_index(drop=True)
+    expected_df = pd.concat([e.value for e in els], ignore_index=True)
+    pd.testing.assert_series_equal(actual_df, expected_df)
+
 
 class ToElementListTest(unittest.TestCase):
   def test_test_stream_payload_events(self):

Reply via email to