pabloem commented on a change in pull request #12799:
URL: https://github.com/apache/beam/pull/12799#discussion_r489494335
##########
File path: sdks/python/apache_beam/runners/interactive/utils.py
##########
@@ -34,7 +34,8 @@ def to_element_list(
reader, # type: Generator[Union[TestStreamPayload.Event,
WindowedValueHolder]]
coder, # type: Coder
include_window_info, # type: bool
- n=None # type: int
+ n=None, # type: int
+ include_teststream_events=False, # type: bool
Review comment:
In this case, 'teststream events' seem t refer to watermark/processing
time events only. Perhaps you can call it 'include_time_events' or something
like that? (since Data events also come form the teststream but are ont
affected by this flag)
##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -256,7 +259,7 @@ def describe(self):
size = sum(
cache_manager.size('full', s.cache_key) for s in
self._streams.values())
- return {'size': size, 'start': self._start}
+ return {'size': size}
Review comment:
Why are you removing start from here?
##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -314,15 +330,55 @@ def cancel(self):
r.wait_until_finish()
self._recordings = set()
+ # The recordings rely on a reference to the BCJ to correctly finish. So we
+ # evict the BCJ after they complete.
+ ie.current_env().evict_background_caching_job(self.user_pipeline)
+
def describe(self):
# type: () -> dict[str, int]
"""Returns a dictionary describing the cache and recording."""
+ cache_manager = ie.current_env().get_cache_manager(self.user_pipeline)
+ capture_size = getattr(cache_manager, 'capture_size', 0)
+
descriptions = [r.describe() for r in self._recordings]
- size = sum(d['size'] for d in descriptions)
- start = min(d['start'] for d in descriptions)
- return {'size': size, 'start': start}
+ if descriptions:
+ size = sum(d['size'] for d in descriptions) + capture_size
+ else:
+ size = capture_size
Review comment:
You can just add line 347 without the if/else. sum(..) over an empty
list will return 0.
##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -114,14 +113,19 @@ def read(self, tail=True):
# all elements from the cache were read. In the latter situation, it may be
# the case that the pipeline was still running. Thus, another invocation of
# `read` will yield new elements.
+ count_limiter = CountLimiter(self._n)
+ time_limiter = ProcessingTimeLimiter(self._duration_secs)
+ limiters = (count_limiter, time_limiter)
for e in utils.to_element_list(reader,
coder,
include_window_info=True,
- n=self._n):
- for l in limiters:
- l.update(e)
-
- yield e
+ n=self._n,
+ include_teststream_events=True):
+ if isinstance(e, TestStreamPayload.Event):
+ time_limiter.update(e)
+ else:
+ count_limiter.update(e)
+ yield e
Review comment:
Are Data elements sent decoded, and that's why we have this if/else?
##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager_test.py
##########
@@ -149,43 +152,37 @@ def test_read_n(self):
def test_read_duration(self):
"""Test that the stream only reads a 'duration' of elements."""
+ def as_windowed_value(element):
+ return WindowedValueHolder(WindowedValue(element, 0, []))
values = (FileRecordsBuilder(tag=self.cache_key)
.advance_processing_time(1)
- .add_element(element=0, event_time_secs=0)
+ .add_element(element=as_windowed_value(0), event_time_secs=0)
.advance_processing_time(1)
- .add_element(element=1, event_time_secs=1)
+ .add_element(element=as_windowed_value(1), event_time_secs=1)
.advance_processing_time(1)
- .add_element(element=2, event_time_secs=3)
+ .add_element(element=as_windowed_value(2), event_time_secs=3)
.advance_processing_time(1)
- .add_element(element=3, event_time_secs=4)
+ .add_element(element=as_windowed_value(3), event_time_secs=4)
.advance_processing_time(1)
- .add_element(element=4, event_time_secs=5)
+ .add_element(element=as_windowed_value(4), event_time_secs=5)
.build()) # yapf: disable
+ values = [
+ v.recorded_event for v in values if isinstance(v, TestStreamFileRecord)
+ ]
+
self.mock_result.set_state(PipelineState.DONE)
self.cache.write(values, 'full', self.cache_key)
- self.cache.save_pcoder(None, 'full', self.cache_key)
-
- # The elements read from the cache are TestStreamFileRecord instances and
- # have the underlying elements encoded. This method decodes the elements
- # from the TestStreamFileRecord.
- def get_elements(events):
- coder = coders.FastPrimitivesCoder()
- elements = []
- for e in events:
- if not isinstance(e, TestStreamFileRecord):
- continue
-
- if e.recorded_event.element_event:
- elements += ([
- coder.decode(el.encoded_element)
- for el in e.recorded_event.element_event.elements
- ])
- return elements
+ self.cache.save_pcoder(coders.FastPrimitivesCoder(), 'full',
self.cache_key)
# The following tests a progression of reading different durations from the
# cache.
+
+ # Small convienence function for getting the values.
+ def get_elements(events):
+ return [e.value for e in events]
Review comment:
I am trying to figure out - where did we stop passing encoded data
events?
##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -314,15 +330,55 @@ def cancel(self):
r.wait_until_finish()
self._recordings = set()
+ # The recordings rely on a reference to the BCJ to correctly finish. So we
+ # evict the BCJ after they complete.
+ ie.current_env().evict_background_caching_job(self.user_pipeline)
+
def describe(self):
# type: () -> dict[str, int]
"""Returns a dictionary describing the cache and recording."""
+ cache_manager = ie.current_env().get_cache_manager(self.user_pipeline)
+ capture_size = getattr(cache_manager, 'capture_size', 0)
+
descriptions = [r.describe() for r in self._recordings]
- size = sum(d['size'] for d in descriptions)
- start = min(d['start'] for d in descriptions)
- return {'size': size, 'start': start}
+ if descriptions:
+ size = sum(d['size'] for d in descriptions) + capture_size
+ else:
+ size = capture_size
+ start = self._start_time_sec
+ bcj = ie.current_env().get_background_caching_job(self.user_pipeline)
+ if bcj:
+ state = bcj.state
+ else:
+ state = PipelineState.STOPPED
+ return {'size': size, 'start': start, 'state': state}
+
+ def record_pipeline(self):
+ # type: () -> bool
+
+ """Starts a background caching job for this RecordingManager's pipeline."""
+
+ runner = self.user_pipeline.runner
+ if isinstance(runner, ir.InteractiveRunner):
+ runner = runner._underlying_runner
+
+ # Make sure that sources without a user reference are still cached.
+ pi.watch_sources(self.user_pipeline)
+
+ # Attempt to run background caching job to record any sources.
+ if ie.current_env().is_in_ipython:
+ warnings.filterwarnings(
+ 'ignore',
+ 'options is deprecated since First stable release. References to '
+ '<pipeline>.options will not be supported',
+ category=DeprecationWarning)
+ if bcj.attempt_to_run_background_caching_job(
+ runner, self.user_pipeline, options=self.user_pipeline.options):
+ self._start_time_sec = time.time()
+ return True
+ return False
Review comment:
Would this failure be logged / handled somewhere? Maybe we should log
the fialure to start the BCJ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]