ajamato commented on a change in pull request #11184: [BEAM-4374] Update protos 
related to MonitoringInfo.
URL: https://github.com/apache/beam/pull/11184#discussion_r398805578
 
 

 ##########
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
 ##########
 @@ -446,7 +447,11 @@ long getInputElementsConsumed(final 
Iterable<MonitoringInfo> monitoringInfos) {
         String pcollection =
             mi.getLabelsOrDefault(MonitoringInfoConstants.Labels.PCOLLECTION, 
null);
         if (pcollection != null && 
pcollection.equals(grpcReadTransformOutputPCollectionName)) {
-          return mi.getMetric().getCounterData().getInt64Value();
+          try {
+            return VARINT_CODER.decode(mi.getPayload().newInput());
 
 Review comment:
   ditto, this is also too low level, please use a helper here. Let's try to 
encapsulate the bytes payload format of MonitoringInfos.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to