This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 61f0b34 [BEAM-12246] Fix ib.collect(dataframe) indexing (#14778)
61f0b34 is described below
commit 61f0b3449f57cafb19377ba89ad2fac101f514a9
Author: Sam sam <[email protected]>
AuthorDate: Fri May 21 15:23:27 2021 -0700
[BEAM-12246] Fix ib.collect(dataframe) indexing (#14778)
* [BEAM-12246] Fix ib.collect(dataframe) indexing
* undo commenting out progress_indicated
* remove reset_unnamed_index
* update type for elements_to_df
* Add todos
---
.../runners/interactive/interactive_beam.py | 21 +++-
.../runners/interactive/interactive_runner_test.py | 119 ++++++++++++++++++++-
.../interactive/options/capture_limiters.py | 10 +-
.../interactive/options/capture_limiters_test.py | 16 +++
.../runners/interactive/recording_manager.py | 10 +-
.../apache_beam/runners/interactive/utils.py | 10 +-
.../apache_beam/runners/interactive/utils_test.py | 61 ++++++++---
7 files changed, 219 insertions(+), 28 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index fd42053..8db1037 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -527,6 +527,8 @@ def collect(pcoll, n='inf', duration='inf',
include_window_info=False):
n: (optional) max number of elements to visualize. Default 'inf'.
duration: (optional) max duration of elements to read in integer seconds or
a string duration. Default 'inf'.
+ include_window_info: (optional) if True, appends the windowing information
+ to each row. Default False.
For example::
@@ -537,8 +539,18 @@ def collect(pcoll, n='inf', duration='inf',
include_window_info=False):
# Run the pipeline and bring the PCollection into memory as a Dataframe.
in_memory_square = head(square, n=5)
"""
+ # Remember the element type so we can make an informed decision on how to
+ # collect the result in elements_to_df.
if isinstance(pcoll, DeferredBase):
- pcoll = to_pcollection(pcoll)
+ # Get the proxy so we can get the output shape of the DataFrame.
+ # TODO(BEAM-11064): Once type hints are implemented for pandas, use those
+ # instead of the proxy.
+ element_type = pcoll._expr.proxy()
+ pcoll = to_pcollection(
+ pcoll, yield_elements='pandas', label=str(pcoll._expr))
+ watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
+ else:
+ element_type = pcoll.element_type
assert isinstance(pcoll, beam.pvalue.PCollection), (
'{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
@@ -572,10 +584,15 @@ def collect(pcoll, n='inf', duration='inf',
include_window_info=False):
recording.cancel()
return pd.DataFrame()
+ if n == float('inf'):
+ n = None
+
+ # Collecting DataFrames may have a length > n, so slice again to be sure.
Note
+ # that array[:None] returns everything.
return elements_to_df(
elements,
include_window_info=include_window_info,
- element_type=pcoll.element_type)
+ element_type=element_type)[:n]
@progress_indicated
diff --git
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 27f1350..fe6989e 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -24,13 +24,13 @@ This module is experimental. No backwards-compatibility
guarantees.
import sys
import unittest
+from typing import NamedTuple
from unittest.mock import patch
import pandas as pd
import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
-from apache_beam.dataframe.convert import to_pcollection
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import interactive_beam as ib
@@ -54,6 +54,12 @@ def print_with_message(msg):
return printer
+class Record(NamedTuple):
+ name: str
+ age: int
+ height: int
+
+
class InteractiveRunnerTest(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_basic(self):
@@ -285,7 +291,6 @@ class InteractiveRunnerTest(unittest.TestCase):
data = p | beam.Create(
[1, 2, 3]) | beam.Map(lambda x: beam.Row(square=x * x, cube=x * x * x))
df = to_dataframe(data)
- pcoll = to_pcollection(df)
# Watch the local scope for Interactive Beam so that values will be cached.
ib.watch(locals())
@@ -295,9 +300,113 @@ class InteractiveRunnerTest(unittest.TestCase):
ie.current_env().track_user_pipelines()
df_expected = pd.DataFrame({'square': [1, 4, 9], 'cube': [1, 8, 27]})
- pd.testing.assert_frame_equal(df_expected, ib.collect(data, n=10))
- pd.testing.assert_frame_equal(df_expected, ib.collect(df, n=10))
- pd.testing.assert_frame_equal(df_expected, ib.collect(pcoll, n=10))
+ pd.testing.assert_frame_equal(
+ df_expected, ib.collect(df, n=10).reset_index(drop=True))
+
+ @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+ def test_dataframes_with_grouped_index(self):
+ p = beam.Pipeline(
+ runner=interactive_runner.InteractiveRunner(
+ direct_runner.DirectRunner()))
+
+ data = [
+ Record('a', 20, 170),
+ Record('a', 30, 170),
+ Record('b', 22, 180),
+ Record('c', 18, 150)
+ ]
+
+ aggregate = lambda df: df.groupby('height').mean()
+
+ deferred_df = aggregate(to_dataframe(p | beam.Create(data)))
+ df_expected = aggregate(pd.DataFrame(data))
+
+ # Watch the local scope for Interactive Beam so that values will be cached.
+ ib.watch(locals())
+
+ # This is normally done in the interactive_utils when a transform is
+ # applied but needs an IPython environment. So we manually run this here.
+ ie.current_env().track_user_pipelines()
+
+ pd.testing.assert_frame_equal(df_expected, ib.collect(deferred_df, n=10))
+
+ @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+ def test_dataframes_with_multi_index(self):
+ p = beam.Pipeline(
+ runner=interactive_runner.InteractiveRunner(
+ direct_runner.DirectRunner()))
+
+ data = [
+ Record('a', 20, 170),
+ Record('a', 30, 170),
+ Record('b', 22, 180),
+ Record('c', 18, 150)
+ ]
+
+ aggregate = lambda df: df.groupby(['name', 'height']).mean()
+
+ deferred_df = aggregate(to_dataframe(p | beam.Create(data)))
+ df_expected = aggregate(pd.DataFrame(data))
+
+ # Watch the local scope for Interactive Beam so that values will be cached.
+ ib.watch(locals())
+
+ # This is normally done in the interactive_utils when a transform is
+ # applied but needs an IPython environment. So we manually run this here.
+ ie.current_env().track_user_pipelines()
+
+ pd.testing.assert_frame_equal(df_expected, ib.collect(deferred_df, n=10))
+
+ @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+ def test_dataframes_with_multi_index_get_result(self):
+ p = beam.Pipeline(
+ runner=interactive_runner.InteractiveRunner(
+ direct_runner.DirectRunner()))
+
+ data = [
+ Record('a', 20, 170),
+ Record('a', 30, 170),
+ Record('b', 22, 180),
+ Record('c', 18, 150)
+ ]
+
+ aggregate = lambda df: df.groupby(['name', 'height']).mean()['age']
+
+ deferred_df = aggregate(to_dataframe(p | beam.Create(data)))
+ df_expected = aggregate(pd.DataFrame(data))
+
+ # Watch the local scope for Interactive Beam so that values will be cached.
+ ib.watch(locals())
+
+ # This is normally done in the interactive_utils when a transform is
+ # applied but needs an IPython environment. So we manually run this here.
+ ie.current_env().track_user_pipelines()
+
+ pd.testing.assert_series_equal(df_expected, ib.collect(deferred_df, n=10))
+
+ @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
+ def test_dataframes_same_cell_twice(self):
+ p = beam.Pipeline(
+ runner=interactive_runner.InteractiveRunner(
+ direct_runner.DirectRunner()))
+ data = p | beam.Create(
+ [1, 2, 3]) | beam.Map(lambda x: beam.Row(square=x * x, cube=x * x * x))
+ df = to_dataframe(data)
+
+ # Watch the local scope for Interactive Beam so that values will be cached.
+ ib.watch(locals())
+
+ # This is normally done in the interactive_utils when a transform is
+ # applied but needs an IPython environment. So we manually run this here.
+ ie.current_env().track_user_pipelines()
+
+ df_expected = pd.DataFrame({'square': [1, 4, 9], 'cube': [1, 8, 27]})
+ pd.testing.assert_series_equal(
+ df_expected['square'],
+ ib.collect(df['square'], n=10).reset_index(drop=True))
+ pd.testing.assert_series_equal(
+ df_expected['cube'],
+ ib.collect(df['cube'], n=10).reset_index(drop=True))
if __name__ == '__main__':
diff --git
a/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
b/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
index 0cc7022..ed921cc 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
@@ -22,10 +22,13 @@ For internal use only; no backwards-compatibility
guarantees.
import threading
+import pandas as pd
+
from apache_beam.portability.api.beam_interactive_api_pb2 import
TestStreamFileHeader
from apache_beam.portability.api.beam_interactive_api_pb2 import
TestStreamFileRecord
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.utils.windowed_value import WindowedValue
class Limiter:
@@ -105,7 +108,12 @@ class CountLimiter(ElementLimiter):
# Otherwise, count everything else but the header of the file since it is
# not an element.
elif not isinstance(e, TestStreamFileHeader):
- self._count += 1
+ # When elements are DataFrames, we want the output to be constrained by
+ # how many rows we have read, not how many DataFrames we have read.
+ if isinstance(e, WindowedValue) and isinstance(e.value, pd.DataFrame):
+ self._count += len(e.value)
+ else:
+ self._count += 1
def is_triggered(self):
return self._count >= self._max_count
diff --git
a/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
b/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
index 2010720..2af1995 100644
---
a/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
+++
b/sdks/python/apache_beam/runners/interactive/options/capture_limiters_test.py
@@ -17,9 +17,12 @@
import unittest
+import pandas as pd
+
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
from apache_beam.runners.interactive.options.capture_limiters import
CountLimiter
from apache_beam.runners.interactive.options.capture_limiters import
ProcessingTimeLimiter
+from apache_beam.utils.windowed_value import WindowedValue
class CaptureLimitersTest(unittest.TestCase):
@@ -33,6 +36,19 @@ class CaptureLimitersTest(unittest.TestCase):
limiter.update(4)
self.assertTrue(limiter.is_triggered())
+ def test_count_limiter_with_dataframes(self):
+ limiter = CountLimiter(5)
+
+ # Test that empty dataframes don't count.
+ for _ in range(10):
+ df = WindowedValue(pd.DataFrame(), 0, [])
+ limiter.update(df)
+
+ self.assertFalse(limiter.is_triggered())
+ df = WindowedValue(pd.DataFrame({'col': list(range(10))}), 0, [])
+ limiter.update(df)
+ self.assertTrue(limiter.is_triggered())
+
def test_processing_time_limiter(self):
limiter = ProcessingTimeLimiter(max_duration_secs=2)
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager.py
b/sdks/python/apache_beam/runners/interactive/recording_manager.py
index 49568b1..f3baefa 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager.py
@@ -296,8 +296,14 @@ class RecordingManager:
watched_pcollections.add(val)
elif isinstance(val, DeferredBase):
watched_dataframes.add(val)
- # Convert them all in a single step for efficiency.
- for pcoll in to_pcollection(*watched_dataframes, always_return_tuple=True):
+
+ # Convert them one-by-one to generate a unique label for each. This allows
+ # caching at a more fine-grained granularity.
+ #
+ # TODO(BEAM-12388): investigate the mixing pcollections in multiple
+ # pipelines error when using the default label.
+ for df in watched_dataframes:
+ pcoll = to_pcollection(df, yield_elements='pandas', label=str(df._expr))
watched_pcollections.add(pcoll)
for pcoll in pcolls:
if pcoll not in watched_pcollections:
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py
b/sdks/python/apache_beam/runners/interactive/utils.py
index c415fd6..878ec23 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -101,8 +101,14 @@ def elements_to_df(elements, include_window_info=False,
element_type=None):
if include_window_info:
windowed_info.append([e.timestamp.micros, e.windows, e.pane_info])
- rows_df = pd.DataFrame(rows, columns=columns_names)
- if include_window_info:
+ using_dataframes = isinstance(element_type, pd.DataFrame)
+ using_series = isinstance(element_type, pd.Series)
+ if using_dataframes or using_series:
+ rows_df = pd.concat(rows)
+ else:
+ rows_df = pd.DataFrame(rows, columns=columns_names)
+
+ if include_window_info and not using_series:
windowed_info_df = pd.DataFrame(
windowed_info, columns=['event_time', 'windows', 'pane_info'])
final_df = pd.concat([rows_df, windowed_info_df], axis=1)
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py
b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 7af9eb1..ecbba30 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -18,6 +18,7 @@
import json
import logging
import unittest
+from typing import NamedTuple
from unittest.mock import PropertyMock
from unittest.mock import patch
@@ -25,7 +26,9 @@ import numpy as np
import pandas as pd
import pytest
+import apache_beam as beam
from apache_beam import coders
+from apache_beam.dataframe.convert import to_dataframe
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import utils
@@ -35,16 +38,23 @@ from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
+class Record(NamedTuple):
+ order_id: int
+ product_id: int
+ quantity: int
+
+
+def windowed_value(e):
+ from apache_beam.transforms.window import GlobalWindow
+ return WindowedValue(e, 1, [GlobalWindow()])
+
+
class ParseToDataframeTest(unittest.TestCase):
def test_parse_windowedvalue(self):
"""Tests that WindowedValues are supported but not present.
"""
- from apache_beam.transforms.window import GlobalWindow
- els = [
- WindowedValue(('a', 2), 1, [GlobalWindow()]),
- WindowedValue(('b', 3), 1, [GlobalWindow()])
- ]
+ els = [windowed_value(('a', 2)), windowed_value(('b', 3))]
actual_df = utils.elements_to_df(els, include_window_info=False)
expected_df = pd.DataFrame([['a', 2], ['b', 3]], columns=[0, 1])
@@ -54,12 +64,8 @@ class ParseToDataframeTest(unittest.TestCase):
def test_parse_windowedvalue_with_window_info(self):
"""Tests that WindowedValues are supported and have their own columns.
"""
- from apache_beam.transforms.window import GlobalWindow
- els = [
- WindowedValue(('a', 2), 1, [GlobalWindow()]),
- WindowedValue(('b', 3), 1, [GlobalWindow()])
- ]
+ els = [windowed_value(('a', 2)), windowed_value(('b', 3))]
actual_df = utils.elements_to_df(els, include_window_info=True)
expected_df = pd.DataFrame(
@@ -72,15 +78,13 @@ class ParseToDataframeTest(unittest.TestCase):
def test_parse_windowedvalue_with_dicts(self):
"""Tests that dicts play well with WindowedValues.
"""
- from apache_beam.transforms.window import GlobalWindow
-
els = [
- WindowedValue({
+ windowed_value({
'b': 2, 'd': 4
- }, 1, [GlobalWindow()]),
- WindowedValue({
+ }),
+ windowed_value({
'a': 1, 'b': 2, 'c': 3
- }, 1, [GlobalWindow()])
+ })
]
actual_df = utils.elements_to_df(els, include_window_info=True)
@@ -91,6 +95,31 @@ class ParseToDataframeTest(unittest.TestCase):
# check_like so that ordering of indices doesn't matter.
pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
+ def test_parse_dataframes(self):
+ """Tests that it correctly parses a DataFrame.
+ """
+ deferred = to_dataframe(beam.Pipeline() | beam.Create([Record(0, 0, 0)]))
+
+ els = [windowed_value(pd.DataFrame(Record(n, 0, 0))) for n in range(10)]
+
+ actual_df = utils.elements_to_df(
+ els, element_type=deferred._expr.proxy()).reset_index(drop=True)
+ expected_df = pd.concat([e.value for e in els], ignore_index=True)
+ pd.testing.assert_frame_equal(actual_df, expected_df)
+
+ def test_parse_series(self):
+ """Tests that it correctly parses a Pandas Series.
+ """
+ deferred = to_dataframe(beam.Pipeline()
+ | beam.Create([Record(0, 0, 0)]))['order_id']
+
+ els = [windowed_value(pd.Series([n])) for n in range(10)]
+
+ actual_df = utils.elements_to_df(
+ els, element_type=deferred._expr.proxy()).reset_index(drop=True)
+ expected_df = pd.concat([e.value for e in els], ignore_index=True)
+ pd.testing.assert_series_equal(actual_df, expected_df)
+
class ToElementListTest(unittest.TestCase):
def test_test_stream_payload_events(self):