[
https://issues.apache.org/jira/browse/BEAM-14408?focusedWorklogId=775261&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-775261
]
ASF GitHub Bot logged work on BEAM-14408:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/May/22 22:14
Start Date: 26/May/22 22:14
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17771:
URL: https://github.com/apache/beam/pull/17771#discussion_r883121794
##########
sdks/python/apache_beam/runners/worker/opcounters.py:
##########
@@ -219,9 +219,14 @@ def update_from_batch(self, windowed_batch):
assert self.producer_batch_converter is not None
assert isinstance(windowed_batch, windowed_value.HomogeneousWindowedBatch)
- self.element_counter.update(
- self.producer_batch_converter.get_length(windowed_batch.values))
- # TODO(BEAM-14408): Update byte size estimate
+ batch_length = self.producer_batch_converter.get_length(
+ windowed_batch.values)
+ self.element_counter.update(batch_length)
+
+ mean_element_size = self.producer_batch_converter.estimate_byte_size(
+ windowed_batch.values) / batch_length
+ for _ in range(batch_length):
+ self.mean_byte_counter.update(mean_element_size)
Review Comment:
This seems very naive, but I'm not sure of another way to keep this
consistent with the element-wise behaviour, which is tracking the distribution
of _element_ byte size estimates. Definitely open to suggestions here.
Issue Time Tracking
-------------------
Worklog Id: (was: 775261)
Time Spent: 20m (was: 10m)
> batch-consuming DoFns should estimate byte size
> -----------------------------------------------
>
> Key: BEAM-14408
> URL: https://issues.apache.org/jira/browse/BEAM-14408
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)