ihji commented on a change in pull request #12754:
URL: https://github.com/apache/beam/pull/12754#discussion_r483281935



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1198,8 +1209,34 @@ def process(self, element, *schema_side_inputs):
       return self._flush_all_batches()
 
   def finish_bundle(self):
+    current_millis = int(time.time() * 1000)
+    if BigQueryWriteFn.LATENCY_LOGGING_LOCK.acquire(False):

Review comment:
       Lock was introduced for sharing the same histogram object between 
threads in a single python process. My first attempt was having a separate 
histogram object per DoFn but it produced too many logs since we're creating 
dozens of DoFns per process with streaming pipelines (# of workers * # of cores 
* # of DoFn threads).
   
   There are two locks: the first one is for synchronizing histogram object 
itself and the second one is only for printing out the logs with a given 
interval. For the second lock, other threads don't need to wait if one thread 
occupies the lock. It's okay that any one of threads puts its hands up first 
and does the job and others just skip the whole code block.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1198,8 +1209,34 @@ def process(self, element, *schema_side_inputs):
       return self._flush_all_batches()
 
   def finish_bundle(self):
+    current_millis = int(time.time() * 1000)
+    if BigQueryWriteFn.LATENCY_LOGGING_LOCK.acquire(False):

Review comment:
       `current_millis` is only for checking whether the enough amount of time 
passed after the last log reporting time. `acquire` with the false parameter 
was intentional to minimize performance impact for log reporting. Please see 
the other comment above.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1198,8 +1209,34 @@ def process(self, element, *schema_side_inputs):
       return self._flush_all_batches()
 
   def finish_bundle(self):
+    current_millis = int(time.time() * 1000)

Review comment:
       > Are we sure finishBundle is called frequently enough?
   
   Yes, for streaming pipeline. The bundle size is sufficiently small.
   
   > Are we sure this code will get invoked when there are failures to write to 
BQ?
   
   Yes, there's a test case for that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to