[
https://issues.apache.org/jira/browse/BEAM-12388?focusedWorklogId=622787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-622787
]
ASF GitHub Bot logged work on BEAM-12388:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Jul/21 23:19
Start Date: 14/Jul/21 23:19
Worklog Time Spent: 10m
Work Description: rohdesamuel commented on a change in pull request
#15146:
URL: https://github.com/apache/beam/pull/15146#discussion_r670016061
##########
File path: sdks/python/apache_beam/runners/interactive/utils.py
##########
@@ -267,3 +270,17 @@ def return_as_json(*args, **kwargs):
return str(return_value)
return return_as_json
+
+
+def deferred_df_to_pcollection(df):
+ assert isinstance(df, DeferredBase), '{} is not a DeferredBase'.format(df)
+
+ # The proxy is used to output a DataFrame with the correct columns.
+ #
+ # TODO(BEAM-11064): Once type hints are implemented for pandas, use those
+ # instead of the proxy.
+ cache = ExpressionCache()
+ cache.replace_with_cached(df._expr)
+
+ proxy = df._expr.proxy()
+ return to_pcollection(df, yield_elements='pandas', label=str(df._expr)),
proxy
Review comment:
No not yet, the proxy is different than the pcoll.element_type. The
proxy is a tool used in the Beam DataFrames implementation to keep track of the
column names and index. Once the TODO above is fixed, however, then it can be
replaced with the pcoll.element_type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 622787)
Time Spent: 1.5h (was: 1h 20m)
> Improve caching experience on InteractiveRunner with dataframes
> ---------------------------------------------------------------
>
> Key: BEAM-12388
> URL: https://issues.apache.org/jira/browse/BEAM-12388
> Project: Beam
> Issue Type: Improvement
> Components: runner-py-interactive
> Reporter: Sam Rohde
> Assignee: Sam Rohde
> Priority: P2
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Reusing the default label for to_pcollection when using the interactive
> runner results in caching errors when used with multiple pipelines:
>
>
> {{Traceback (most recent call last):
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py",
> line 389, in test_dataframes_with_multi_index_get_result
> pd.testing.assert_series_equal(df_expected, ib.collect(deferred_df, n=10))
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/utils.py",
> line 247, in run_within_progress_indicator
> return func(*args, **kwargs)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py",
> line 579, in collect
> recording = recording_manager.record([pcoll], max_n=n,
> max_duration=duration)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py",
> line 433, in record
> self._watch(pcolls)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py",
> line 306, in _watch
> for pcoll in to_pcollection(*watched_dataframes,
> always_return_tuple=True):
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/dataframe/convert.py",
> line 196, in to_pcollection
> new_results = {p: extract_input(p)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py",
> line 1086, in __ror__
> return self.transform.__ror__(pvalueish, self.label)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py",
> line 587, in __ror__
> raise ValueError(
> ValueError: Mixing value from different pipelines not allowed.}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)