[
https://issues.apache.org/jira/browse/BEAM-10603?focusedWorklogId=464300&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-464300
]
ASF GitHub Bot logged work on BEAM-10603:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jul/20 07:27
Start Date: 30/Jul/20 07:27
Worklog Time Spent: 10m
Work Description: rohdesamuel commented on a change in pull request
#12411:
URL: https://github.com/apache/beam/pull/12411#discussion_r462637247
##########
File path:
sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
##########
@@ -71,3 +87,62 @@ def _trigger(self):
def is_triggered(self):
return self._triggered
+
+
+class CountLimiter(ElementLimiter):
+ """Limits by counting the number of elements seen."""
+ def __init__(self, max_count):
+ self._max_count = max_count
+ self._count = 0
+
+ def update(self, e):
+ # A TestStreamFileRecord can contain many elements at once. If e is a file
+ # record, then count the number of elements in the bundle.
+ if isinstance(e, TestStreamFileRecord):
+ if not e.recorded_event.element_event:
+ return
+ self._count += len(e.recorded_event.element_event.elements)
+
+ # Otherwise, count everything else but the header of the file since it is
+ # not an element.
+ elif not isinstance(e, TestStreamFileHeader):
+ self._count += 1
+
+ def is_triggered(self):
+ return self._count >= self._max_count
+
+
+class ProcessingTimeLimiter(ElementLimiter):
+ """Limits by how long the ProcessingTime passed in the element stream.
+
+ This measures the duration from either a constructor argument or from the
+ first element in the stream. Each subsequent element has a delta
+ "advance_duration" that moves the internal clock forward. This triggers when
+ the duration from the internal clock and the start exceeds the given
duration.
+ """
+ def __init__(self, max_duration_secs, start_secs=None):
Review comment:
Ack. I looked around and I wasn't using the start parameter, so I
removed it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 464300)
Remaining Estimate: 0h
Time Spent: 10m
> Large Source Recording for Interarctive Runner
> ----------------------------------------------
>
> Key: BEAM-10603
> URL: https://issues.apache.org/jira/browse/BEAM-10603
> Project: Beam
> Issue Type: Improvement
> Components: runner-py-interactive
> Reporter: Sam Rohde
> Assignee: Sam Rohde
> Priority: P1
> Time Spent: 10m
> Remaining Estimate: 0h
>
> This changes the Interactive Runner to create a long-running background
> caching job that is decoupled from the user pipeline. When a user invokes a
> collect() or show(), it will read from the cache to compute the requested
> PCollections. Previously, the user would have to wait for the cache to be
> fully written to. This allows for the user to start experimenting immediately.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)