piotr-szuberski commented on a change in pull request #11661:
URL: https://github.com/apache/beam/pull/11661#discussion_r432061809
##########
File path: sdks/python/apache_beam/examples/wordcount_it_test.py
##########
@@ -84,11 +87,45 @@ def _run_wordcount_it(self, run_wordcount, **opts):
# Register clean up before pipeline execution
self.addCleanup(delete_files, [test_output + '*'])
+ publish_to_bq = bool(
+ test_pipeline.get_option('publish_to_big_query') or False)
+
+ # Start measure time for performance test
+ start_time = time.time()
+
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
run_wordcount(
test_pipeline.get_full_options_as_args(**extra_opts),
- save_main_session=False)
+ save_main_session=False,
+ )
+
+ end_time = time.time()
+ run_time = end_time - start_time
+
+ if publish_to_bq:
+ self._publish_metrics(test_pipeline, run_time)
+
+ def _publish_metrics(self, pipeline, metric_value):
+ influx_options = InfluxDBMetricsPublisherOptions(
+ pipeline.get_option('influx_measurement'),
+ pipeline.get_option('influx_db_name'),
+ pipeline.get_option('influx_hostname'),
+ os.getenv('INFLUXDB_USER'),
+ os.getenv('INFLUXDB_USER_PASSWORD'),
+ )
+ metric_reader = MetricsReader(
+ project_name=pipeline.get_option('project'),
+ bq_table=pipeline.get_option('metrics_table'),
+ bq_dataset=pipeline.get_option('metrics_dataset'),
+ publish_to_bq=True,
+ influxdb_options=influx_options,
+ )
+
+ metric_reader.publish_values((
+ metric_value,
Review comment:
Good point, I changed it to wordcount_it_runtime and the order of key,
value.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]