ajamato commented on a change in pull request #13017: URL: https://github.com/apache/beam/pull/13017#discussion_r501390668
########## File path: sdks/python/apache_beam/metrics/cells.py ########## @@ -238,6 +250,57 @@ def to_runner_api_monitoring_info(self, name, transform_id): ptransform=transform_id) +class HistogramCell(MetricCell): + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value and delta for a histogram metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, that is later + aggregated. + + This class is thread safe since underlying histogram object is thread safe. + """ + def __init__(self, bucket_type): + self._bucket_type = bucket_type + self.data = HistogramAggregator(bucket_type).identity_element() + + def reset(self): + self.data = HistogramAggregator(self._bucket_type).identity_element() + + def combine(self, other): + # type: (HistogramCell) -> HistogramCell + result = HistogramCell(self._bucket_type) + result.data = self.data.combine(other.data) + return result + + def update(self, value): + self.data.histogram.record(value) + + def get_cumulative(self): + # type: () -> HistogramData + return self.data.get_cumulative() + + def to_runner_api_monitoring_info(self, name, transform_id): Review comment: Might be good to ensure this isn't called. Can we throw not implemented? Or if that breaks other code, at least comment why this isn't populated yet. ########## File path: sdks/python/apache_beam/io/gcp/bigquery.py ########## @@ -1102,7 +1096,7 @@ def __init__( retry_strategy=None, additional_bq_parameters=None, ignore_insert_ids=False, - latency_logging_frequency_sec=None): + streaming_api_logging_frequency_sec=None): Review comment: Can this be obtained without plumbing to the ctor. I.e. as a pipeline option accessed directly, or constant in this file? ########## File path: sdks/python/apache_beam/metrics/metric.py ########## @@ -293,3 +339,49 @@ def with_steps(self, steps): self._steps.update(steps) return self + + +class MetricLogger(object): Review comment: Can you test that this class works if you change the current metric environment. We should make sure that all instances of the metic on all MetricEnvironments will be logged. AS the DelegatingMetric's value changes based on the MetricEnvironment it is set in. I suspect what you are doing works fine, as long as long as you keep a separate timer for each MetricEnvironment. I.e. imagine two transforms with a metirc in two separate MetricEnvironments. If they are sharing the same time in the same MetricLogger, it may log for one metric environment, but not the other one, when it tries to log. Note: - Every time a new bundle is processed, a new MetricEnvironment is created, which will clear the metrics. - Separate transforms, or code running in separate threads will store metrics in separate MetricEnvironments - DelegatingMetrics always use the current MetricEnvironment to store a metric, or get a metric value (This is a thread local variable which is reset when a new bundle is processed, new transform is executing, etc.). ########## File path: sdks/python/apache_beam/metrics/cells.py ########## @@ -238,6 +250,57 @@ def to_runner_api_monitoring_info(self, name, transform_id): ptransform=transform_id) +class HistogramCell(MetricCell): + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value and delta for a histogram metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, that is later + aggregated. + + This class is thread safe since underlying histogram object is thread safe. + """ + def __init__(self, bucket_type): + self._bucket_type = bucket_type + self.data = HistogramAggregator(bucket_type).identity_element() + + def reset(self): + self.data = HistogramAggregator(self._bucket_type).identity_element() + + def combine(self, other): + # type: (HistogramCell) -> HistogramCell + result = HistogramCell(self._bucket_type) + result.data = self.data.combine(other.data) + return result + + def update(self, value): + self.data.histogram.record(value) + + def get_cumulative(self): + # type: () -> HistogramData + return self.data.get_cumulative() + + def to_runner_api_monitoring_info(self, name, transform_id): + return None + + +class HistogramCellFactory(MetricCellFactory): Review comment: Is this class necessary? It seems like it completely defers to HistogramCell, regardless of bucket_type or any parameters. Can it be removed. Do any of the other Cell classes have a Factory like this? I didn't notice this elsewhere. ########## File path: sdks/python/apache_beam/io/gcp/bigquery_tools.py ########## @@ -271,6 +276,11 @@ def __init__(self, client=None): # randomized prefix for row IDs. self._row_id_prefix = '' if client else uuid.uuid4() self._temporary_table_suffix = uuid.uuid4().hex + self._latency_histogram_metric = Metrics.histogram( Review comment: We should make this clear that this isn't fully supported as a user metric type in the SDK, or in any Runners. We may need to relocate this class from Metrics.histogram to somewhere else i.e. InternalMetrics.histogram @pabloem any ideas what we ought to do here? This metric isn't fully implement but the API allows us to log the metric locally. ########## File path: sdks/python/apache_beam/metrics/cells.py ########## @@ -238,6 +250,57 @@ def to_runner_api_monitoring_info(self, name, transform_id): ptransform=transform_id) +class HistogramCell(MetricCell): + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value and delta for a histogram metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, that is later + aggregated. + + This class is thread safe since underlying histogram object is thread safe. + """ + def __init__(self, bucket_type): + self._bucket_type = bucket_type + self.data = HistogramAggregator(bucket_type).identity_element() + + def reset(self): + self.data = HistogramAggregator(self._bucket_type).identity_element() + + def combine(self, other): + # type: (HistogramCell) -> HistogramCell + result = HistogramCell(self._bucket_type) + result.data = self.data.combine(other.data) + return result + + def update(self, value): + self.data.histogram.record(value) + + def get_cumulative(self): + # type: () -> HistogramData + return self.data.get_cumulative() + + def to_runner_api_monitoring_info(self, name, transform_id): + return None + + +class HistogramCellFactory(MetricCellFactory): Review comment: Additionally, its not clear to me that MetricCellFactory is necessary ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org