[
https://issues.apache.org/jira/browse/BEAM-9488?focusedWorklogId=426802&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-426802
]
ASF GitHub Bot logged work on BEAM-9488:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Apr/20 00:26
Start Date: 24/Apr/20 00:26
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #11514:
URL: https://github.com/apache/beam/pull/11514#discussion_r414208960
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
- for mi in op.monitoring_infos(transform_id).values():
- fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
- all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
- infos_list = list(all_monitoring_infos_dict.values())
-
- def inject_pcollection(monitoring_info):
- """
- If provided metric is element count metric:
- Finds relevant transform output info in current process_bundle_descriptor
- and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
- info.
- """
- if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
- if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
- return
- ptransform_label = monitoring_info.labels[
- monitoring_infos.PTRANSFORM_LABEL]
- if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
- return
- tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
- if not ptransform_label in self.process_bundle_descriptor.transforms:
- return
- if not tag_label in self.process_bundle_descriptor.transforms[
- ptransform_label].outputs:
Review comment:
Likewise this.
##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -337,43 +337,48 @@ def add_receiver(self, operation, output_index=0):
"""Adds a receiver operation for the specified output."""
self.consumers[output_index].append(operation)
- def monitoring_infos(self, transform_id):
- # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+ def monitoring_infos(self, transform_id, pcollection_ids):
+ # type: (str, list(str)) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
"""Returns the list of MonitoringInfos collected by this operation."""
all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
all_monitoring_infos.update(
- self.pcollection_count_monitoring_infos(transform_id))
+ self.pcollection_count_monitoring_infos(pcollection_ids))
all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
return all_monitoring_infos
- def pcollection_count_monitoring_infos(self, transform_id):
+ def pcollection_count_monitoring_infos(self, pcollection_ids):
"""Returns the element count MonitoringInfo collected by this operation."""
- if len(self.receivers) == 1:
- # If there is exactly one output, we can unambiguously
- # fix its name later, which we do.
- # TODO(robertwb): Plumb the actual name here.
+ if len(self.receivers) != len(pcollection_ids):
+ raise RuntimeError(
+ 'Unexpected number of receivers for number of pcollections %s %s' %
+ (self.receivers, pcollection_ids))
+
+ all_monitoring_infos = {}
+ for i in range(len(self.receivers)):
Review comment:
This will change if you use a mapping, but `zip` would be the idiom to
use here.
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
- for mi in op.monitoring_infos(transform_id).values():
- fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
- all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
- infos_list = list(all_monitoring_infos_dict.values())
-
- def inject_pcollection(monitoring_info):
- """
- If provided metric is element count metric:
- Finds relevant transform output info in current process_bundle_descriptor
- and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
- info.
- """
- if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
- if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
- return
- ptransform_label = monitoring_info.labels[
- monitoring_infos.PTRANSFORM_LABEL]
- if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
- return
- tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
- if not ptransform_label in self.process_bundle_descriptor.transforms:
- return
- if not tag_label in self.process_bundle_descriptor.transforms[
- ptransform_label].outputs:
- return
-
- pcollection_name = (
- self.process_bundle_descriptor.transforms[ptransform_label].
- outputs[tag_label])
-
- monitoring_info.labels[
- monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
-
- # Cleaning up labels that are not in specification.
- monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
- monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
-
- for mi in infos_list:
- inject_pcollection(mi)
-
- return infos_list
+ pcollection_ids = self.process_bundle_descriptor.transforms[
Review comment:
In practice this might be OK (dicts have undefined, but I think when
modified deterministic, iteration order), but seems rather brittle to me. Could
we instead passed the tag -> pcollection_id mapping here?
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
- for mi in op.monitoring_infos(transform_id).values():
- fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
- all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
- infos_list = list(all_monitoring_infos_dict.values())
-
- def inject_pcollection(monitoring_info):
- """
- If provided metric is element count metric:
- Finds relevant transform output info in current process_bundle_descriptor
- and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
- info.
- """
- if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
- if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
- return
- ptransform_label = monitoring_info.labels[
- monitoring_infos.PTRANSFORM_LABEL]
- if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
- return
- tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
- if not ptransform_label in self.process_bundle_descriptor.transforms:
- return
Review comment:
This would be a bug, right?
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1035,59 +1035,12 @@ def monitoring_infos(self):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
- for mi in op.monitoring_infos(transform_id).values():
- fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
- all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
- infos_list = list(all_monitoring_infos_dict.values())
-
- def inject_pcollection(monitoring_info):
- """
- If provided metric is element count metric:
- Finds relevant transform output info in current process_bundle_descriptor
- and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
- info.
- """
- if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
- if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
- return
- ptransform_label = monitoring_info.labels[
- monitoring_infos.PTRANSFORM_LABEL]
- if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
- return
- tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
- if not ptransform_label in self.process_bundle_descriptor.transforms:
- return
- if not tag_label in self.process_bundle_descriptor.transforms[
- ptransform_label].outputs:
Review comment:
Actually, this can happen, and might be what's happening here. There is
no PCollection for this tag, but the user outputted a value to this tag. It
would make sense to record this output even if we didn't use it. This is
another downside of attaching these counters to PCollections themselves rather
than to PTransform outputs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 426802)
Time Spent: 50m (was: 40m)
> Python SDK sending unexpected MonitoringInfo
> --------------------------------------------
>
> Key: BEAM-9488
> URL: https://issues.apache.org/jira/browse/BEAM-9488
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Ruoyun Huang
> Assignee: Luke Cwik
> Priority: Minor
> Time Spent: 50m
> Remaining Estimate: 0h
>
> element_count metrics is supposed to be tied with pcollection ids, but by
> inspecting what is sent over by python sdk, we see there are monitoringInfo
> sent wit ptransforms in it.
> [Double checked the job graph, these seem to be redundant. i.e. the
> corresponding pcollection does have its own MonitoringInfo reported.]
> Likely a bug.
> Proof:
> urn: "beam:metric:element_count:v1"
> type: "beam:metrics:sum_int_64"
> metric {
> counter_data {
> int64_value: 1
> }
> }
> labels {
> key: "PTRANSFORM"
> value: "start/MaybeReshuffle/Reshuffle/RemoveRandomKeys-ptransform-85"
> }
> labels {
> key: "TAG"
> value: "None"
> }
> timestamp {
> seconds: 1583949073
> nanos: 842402935
> }
--
This message was sent by Atlassian Jira
(v8.3.4#803005)