rohdesamuel commented on a change in pull request #14778:
URL: https://github.com/apache/beam/pull/14778#discussion_r634735443
##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -298,8 +298,12 @@ def _watch(self, pcolls):
watched_pcollections.add(val)
elif isinstance(val, DeferredBase):
watched_dataframes.add(val)
- # Convert them all in a single step for efficiency.
- for pcoll in to_pcollection(*watched_dataframes, always_return_tuple=True):
+
+ # Convert them one-by-one to generate a unique label for each. This allows
+ # caching at a more fine-grained granularity.
+ for df in watched_dataframes:
+ pcoll = to_pcollection(
+ df, yield_elements='pandas', label=str(id(df._expr._id)))
Review comment:
I'm getting a lot of errors when trying it out, so I think I'll
investigate it after this PR.
```
Traceback (most recent call last):
File
"/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py",
line 389, in test_dataframes_with_multi_index_get_result
pd.testing.assert_series_equal(df_expected, ib.collect(deferred_df,
n=10))
File
"/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/utils.py",
line 247, in run_within_progress_indicator
return func(*args, **kwargs)
File
"/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py",
line 579, in collect
recording = recording_manager.record([pcoll], max_n=n,
max_duration=duration)
File
"/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py",
line 433, in record
self._watch(pcolls)
File
"/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py",
line 306, in _watch
for pcoll in to_pcollection(*watched_dataframes,
always_return_tuple=True):
File
"/home/srohde/Workdir/beam/sdks/python/apache_beam/dataframe/convert.py", line
196, in to_pcollection
new_results = {p: extract_input(p)
File
"/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py",
line 1086, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File
"/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py",
line 587, in __ror__
raise ValueError(
ValueError: Mixing value from different pipelines not allowed.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]