[
https://issues.apache.org/jira/browse/BEAM-12388?focusedWorklogId=622794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-622794
]
ASF GitHub Bot logged work on BEAM-12388:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Jul/21 23:40
Start Date: 14/Jul/21 23:40
Worklog Time Spent: 10m
Work Description: rohdesamuel commented on a change in pull request
#15146:
URL: https://github.com/apache/beam/pull/15146#discussion_r670023732
##########
File path:
sdks/python/apache_beam/runners/interactive/caching/expression_cache.py
##########
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import *
+
+import apache_beam as beam
+from apache_beam.dataframe import convert
+from apache_beam.dataframe import expressions
+
+
+class ExpressionCache(object):
Review comment:
Added a comment to answer your questions here. My personal style is to
not have constructors do too much work, so I kept the `replace_with_cached`
where it is. The lifecycle is temporary and the only side-effects are in the
modified expression. The cache isn't persisted between multiple runs, it just
uses existing caches from the dataframe and interactive modules.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 622794)
Time Spent: 1h 50m (was: 1h 40m)
> Improve caching experience on InteractiveRunner with dataframes
> ---------------------------------------------------------------
>
> Key: BEAM-12388
> URL: https://issues.apache.org/jira/browse/BEAM-12388
> Project: Beam
> Issue Type: Improvement
> Components: runner-py-interactive
> Reporter: Sam Rohde
> Assignee: Sam Rohde
> Priority: P2
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Reusing the default label for to_pcollection when using the interactive
> runner results in caching errors when used with multiple pipelines:
>
>
> {{Traceback (most recent call last):
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py",
> line 389, in test_dataframes_with_multi_index_get_result
> pd.testing.assert_series_equal(df_expected, ib.collect(deferred_df, n=10))
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/utils.py",
> line 247, in run_within_progress_indicator
> return func(*args, **kwargs)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py",
> line 579, in collect
> recording = recording_manager.record([pcoll], max_n=n,
> max_duration=duration)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py",
> line 433, in record
> self._watch(pcolls)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py",
> line 306, in _watch
> for pcoll in to_pcollection(*watched_dataframes,
> always_return_tuple=True):
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/dataframe/convert.py",
> line 196, in to_pcollection
> new_results = {p: extract_input(p)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py",
> line 1086, in __ror__
> return self.transform.__ror__(pvalueish, self.label)
> File
> "/home/srohde/Workdir/beam/sdks/python/apache_beam/transforms/ptransform.py",
> line 587, in __ror__
> raise ValueError(
> ValueError: Mixing value from different pipelines not allowed.}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)