rohdesamuel commented on code in PR #27157:
URL: https://github.com/apache/beam/pull/27157#discussion_r1240196932
##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -268,6 +268,8 @@ def get_responses():
work_request)
finally:
self._alive = False
+ if self.data_sampler:
Review Comment:
Done
##########
sdks/python/apache_beam/runners/worker/data_sampler.py:
##########
@@ -67,68 +115,127 @@ def remove_windowed_value(self, el: Union[WindowedValue,
Any]) -> Any:
return self.remove_windowed_value(el.value)
return el
- def time(self) -> float:
- """Returns the current time. Used for mocking out the clock for testing."""
- return self._clock.time() if self._clock else time.time()
+ def peek(self) -> List[bytes]:
Review Comment:
Done
##########
sdks/python/apache_beam/runners/worker/data_sampler.py:
##########
@@ -67,68 +115,127 @@ def remove_windowed_value(self, el: Union[WindowedValue,
Any]) -> Any:
return self.remove_windowed_value(el.value)
Review Comment:
done
##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -197,6 +206,15 @@ def update_counters_start(self, windowed_value):
# type: (WindowedValue) -> None
self.opcounter.update_from(windowed_value)
+ # The following code is optimized by inlining a function call. Because this
+ # is called for every element, a function call is too expensive (order of
+ # 100s of nanoseconds). Furthermore, a lock was purposefully not used
+ # between here and the DataSampler as an additional operation. The tradeoff
+ # is that some samples might be dropped, but it is better than the
+ # alternative which is double sampling the same element.
+ self.element_sampler.el = windowed_value
+ self.element_sampler.has_element = True
Review Comment:
Did some testing, they are roughly equivalent. I'll change to an if to make
it clearer that this is an optional feature (and in case I add more things in
the future).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]