[
https://issues.apache.org/jira/browse/BEAM-10791?focusedWorklogId=478830&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-478830
]
ASF GitHub Bot logged work on BEAM-10791:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Sep/20 22:14
Start Date: 03/Sep/20 22:14
Worklog Time Spent: 10m
Work Description: ihji commented on a change in pull request #12754:
URL: https://github.com/apache/beam/pull/12754#discussion_r483281935
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1198,8 +1209,34 @@ def process(self, element, *schema_side_inputs):
return self._flush_all_batches()
def finish_bundle(self):
+ current_millis = int(time.time() * 1000)
+ if BigQueryWriteFn.LATENCY_LOGGING_LOCK.acquire(False):
Review comment:
Lock was introduced for sharing the same histogram object between
threads in a single python process. My first attempt was having a separate
histogram object per DoFn but it produced too many logs since we're creating
dozens of DoFns per process with streaming pipelines (# of workers * # of cores
* # of DoFn threads).
There are two locks: the first one is for synchronizing histogram object
itself and the second one is only for printing out the logs with a given
interval. For the second lock, other threads don't need to wait if one thread
occupies the lock. It's okay that any one of threads puts its hands up first
and does the job and others just skip the whole code block.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1198,8 +1209,34 @@ def process(self, element, *schema_side_inputs):
return self._flush_all_batches()
def finish_bundle(self):
+ current_millis = int(time.time() * 1000)
+ if BigQueryWriteFn.LATENCY_LOGGING_LOCK.acquire(False):
Review comment:
`current_millis` is only for checking whether the enough amount of time
passed after the last log reporting time. `acquire` with the false parameter
was intentional to minimize performance impact for log reporting. Please see
the other comment above.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1198,8 +1209,34 @@ def process(self, element, *schema_side_inputs):
return self._flush_all_batches()
def finish_bundle(self):
+ current_millis = int(time.time() * 1000)
Review comment:
> Are we sure finishBundle is called frequently enough?
Yes, for streaming pipeline. The bundle size is sufficiently small.
> Are we sure this code will get invoked when there are failures to write to
BQ?
Yes, there's a test case for that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 478830)
Time Spent: 50m (was: 40m)
> Identify and log additional information needed to debug streaming insert
> requests for Python SDK
> ------------------------------------------------------------------------------------------------
>
> Key: BEAM-10791
> URL: https://issues.apache.org/jira/browse/BEAM-10791
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp
> Reporter: Heejong Lee
> Assignee: Heejong Lee
> Priority: P2
> Time Spent: 50m
> Remaining Estimate: 0h
>
> implement logging for per worker statistics:
> - Request count - for that window.
> - Error codes + number of occurrences for that window (Or perhaps just log
> each error with as much detail as possible.)
> - Tail latencies of requests (50, 90 and 99, percentiles)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)