ajamato commented on a change in pull request #12822:
URL: https://github.com/apache/beam/pull/12822#discussion_r488797757



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1210,20 +1236,32 @@ def process(self, element, *schema_side_inputs):
       return self._flush_all_batches()
 
   def finish_bundle(self):
-    if BigQueryWriteFn.LATENCY_LOGGING_LOCK.acquire(False):
+    if BigQueryWriteFn.STREAMING_API_LOGGING_LOCK.acquire(False):

Review comment:
       Can all of this be encapsulated entirely in a new class? or at least a 
helper that can be called in finish_bundle
   
   Ideally we can instrument this without  adding so much to the original 
implementation.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1210,20 +1236,32 @@ def process(self, element, *schema_side_inputs):
       return self._flush_all_batches()
 
   def finish_bundle(self):
-    if BigQueryWriteFn.LATENCY_LOGGING_LOCK.acquire(False):
+    if BigQueryWriteFn.STREAMING_API_LOGGING_LOCK.acquire(False):
       try:
         current_millis = int(time.time() * 1000)
-        if (BigQueryWriteFn.LATENCY_LOGGING_HISTOGRAM.total_count() > 0 and
-            (current_millis -
-             BigQueryWriteFn.LATENCY_LOGGING_LAST_REPORTED_MILLIS) >
-            self._latency_logging_frequency_msec):
-          _LOGGER.info(
-              BigQueryWriteFn.LATENCY_LOGGING_HISTOGRAM.get_percentile_info(
-                  'streaming insert requests', 'ms'))
-          BigQueryWriteFn.LATENCY_LOGGING_HISTOGRAM.clear()
-          BigQueryWriteFn.LATENCY_LOGGING_LAST_REPORTED_MILLIS = current_millis
+        if ((current_millis -
+             BigQueryWriteFn.STREAMING_API_LOGGING_LAST_REPORTED_MILLIS) >
+            self._streaming_api_logging_frequency_msec):
+          streaming_api_info = [
+              '[Streaming Insert API Statistics since %s]' %
+              datetime.datetime.fromtimestamp(
+                  BigQueryWriteFn.STREAMING_API_LOGGING_LAST_REPORTED_MILLIS /
+                  1000.0)
+          ]
+          if BigQueryWriteFn.LATENCY_LOGGING_HISTOGRAM.total_count() > 0:
+            streaming_api_info.append(

Review comment:
       Please address code coverage warnings
   
   
   Codecov
   / codecov/patch
   
   sdks/python/apache_beam/io/gcp/bigquery.py#L1252
   Added line #L1252 was not covered by tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to