[
https://issues.apache.org/jira/browse/BEAM-11629?focusedWorklogId=542429&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-542429
]
ASF GitHub Bot logged work on BEAM-11629:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Jan/21 21:38
Start Date: 26/Jan/21 21:38
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #13739:
URL: https://github.com/apache/beam/pull/13739#discussion_r564849073
##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -655,9 +660,13 @@ def process(
t=beam.DoFn.TimestampParam):
yield test_stream.WindowedValueHolder(WindowedValue(e, t, [w], p))
+ extended_target = pcoll
+ if self._is_streaming_pcollection(pcoll):
Review comment:
We should *always* be writing windowing information to the cache, as
even in the global window there may be other important information such as
timestamps that we need to preserve. This simplifies the code below as well.
We can conditionally decide to display this (i.e. have the default value of
`include_window_info` depend on whether the windowing is default, which the
user can manually specify one way or the other if that's not what they want).
##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -595,6 +596,10 @@ def _process(self, pcoll):
self._cache_manager = ie.current_env().get_cache_manager(
self._user_pipeline, create_if_absent=True)
+ def _is_streaming_pcollection(self, pcoll):
Review comment:
This should be called _is_globally_windowed.
Whether or not something is streaming is orthogonal to what windowing it is
in.
##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -595,6 +596,10 @@ def _process(self, pcoll):
self._cache_manager = ie.current_env().get_cache_manager(
self._user_pipeline, create_if_absent=True)
+ def _is_streaming_pcollection(self, pcoll):
Review comment:
An even better test might be `pcoll.windowing.is_default()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 542429)
Time Spent: 1h 10m (was: 1h)
> Optimize the cache storage for InteractiveRunner
> ------------------------------------------------
>
> Key: BEAM-11629
> URL: https://issues.apache.org/jira/browse/BEAM-11629
> Project: Beam
> Issue Type: Improvement
> Components: runner-py-interactive
> Reporter: Dmytro Kozhevin
> Assignee: Dmytro Kozhevin
> Priority: P2
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Currently InteractiveRunner wraps every record of the cached PCollection into
> WindowedValue. There is 2 problems about this:
> 1) The windowing information is unnecessary for the batch-mode runs
> (everything is in the same global window).
> 2) Since the cache is stored as text, we pickle the WindowedValue, which adds
> ~500 bytes of data to every record (e.g. a cache of just 1000000 integers
> would take ~500MB instead of ~4MB).
> These issues significantly slow down the interactive runs for data with lots
> of small rows.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)