robertwb commented on a change in pull request #13739:
URL: https://github.com/apache/beam/pull/13739#discussion_r564849073
##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -655,9 +660,13 @@ def process(
t=beam.DoFn.TimestampParam):
yield test_stream.WindowedValueHolder(WindowedValue(e, t, [w], p))
+ extended_target = pcoll
+ if self._is_streaming_pcollection(pcoll):
Review comment:
We should *always* be writing windowing information to the cache, as
even in the global window there may be other important information such as
timestamps that we need to preserve. This simplifies the code below as well.
We can conditionally decide to display this (i.e. have the default value of
`include_window_info` depend on whether the windowing is default, which the
user can manually specify one way or the other if that's not what they want).
##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -595,6 +596,10 @@ def _process(self, pcoll):
self._cache_manager = ie.current_env().get_cache_manager(
self._user_pipeline, create_if_absent=True)
+ def _is_streaming_pcollection(self, pcoll):
Review comment:
This should be called _is_globally_windowed.
Whether or not something is streaming is orthogonal to what windowing it is
in.
##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -595,6 +596,10 @@ def _process(self, pcoll):
self._cache_manager = ie.current_env().get_cache_manager(
self._user_pipeline, create_if_absent=True)
+ def _is_streaming_pcollection(self, pcoll):
Review comment:
An even better test might be `pcoll.windowing.is_default()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]