[ 
https://issues.apache.org/jira/browse/BEAM-4775?focusedWorklogId=201129&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-201129
 ]

ASF GitHub Bot logged work on BEAM-4775:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Feb/19 07:40
            Start Date: 20/Feb/19 07:40
    Worklog Time Spent: 10m 
      Work Description: ryan-williams commented on pull request #7876: 
[BEAM-4775] Clean up metric protos; support integer distributions, gauges
URL: https://github.com/apache/beam/pull/7876#discussion_r258362774
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
 ##########
 @@ -94,121 +93,128 @@ public MetricsContainer getMetricsContainer(String 
stepName) {
         : null;
   }
 
+  public MetricsContainer getUnboundMetricsContainer() {
+    return metricsAccumulator != null
+        ? metricsAccumulator.getLocalValue().getUnboundContainer()
+        : null;
+  }
+
   /**
    * Update this container with metrics from the passed {@link 
MonitoringInfo}s, and send updates
    * along to Flink's internal metrics framework.
    */
-  public void updateMetrics(String stepName, List<MonitoringInfo> 
monitoringInfos) {
-    MetricsContainer metricsContainer = getMetricsContainer(stepName);
+  public void updateMetrics(List<MonitoringInfo> monitoringInfos) {
+    LOG.info("Flink updating metrics with {} monitoring infos", 
monitoringInfos.size());
     monitoringInfos.forEach(
         monitoringInfo -> {
-          if (monitoringInfo.hasMetric()) {
-            String urn = monitoringInfo.getUrn();
-            MetricName metricName = parseUrn(urn);
-            Metric metric = monitoringInfo.getMetric();
-            if (metric.hasCounterData()) {
-              CounterData counterData = metric.getCounterData();
-              if (counterData.getValueCase() == 
CounterData.ValueCase.INT64_VALUE) {
-                org.apache.beam.sdk.metrics.Counter counter =
-                    metricsContainer.getCounter(metricName);
-                counter.inc(counterData.getInt64Value());
-              } else {
-                LOG.warn("Unsupported CounterData type: {}", counterData);
-              }
-            } else if (metric.hasDistributionData()) {
-              DistributionData distributionData = metric.getDistributionData();
-              if (distributionData.hasIntDistributionData()) {
-                Distribution distribution = 
metricsContainer.getDistribution(metricName);
-                IntDistributionData intDistributionData = 
distributionData.getIntDistributionData();
-                distribution.update(
-                    intDistributionData.getSum(),
-                    intDistributionData.getCount(),
-                    intDistributionData.getMin(),
-                    intDistributionData.getMax());
-              } else {
-                LOG.warn("Unsupported DistributionData type: {}", 
distributionData);
-              }
-            } else if (metric.hasExtremaData()) {
-              ExtremaData extremaData = metric.getExtremaData();
-              LOG.warn("Extrema metric unsupported: {}", extremaData);
-            }
+          if (!monitoringInfo.hasMetric()) {
 
 Review comment:
   Interesting. It seems in this case like it's easier and better to just 
handle all the metrics that come through conforming to the standard format? Or 
maybe I'm not understanding what you mean.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 201129)
    Time Spent: 14h 40m  (was: 14.5h)

> JobService should support returning metrics
> -------------------------------------------
>
>                 Key: BEAM-4775
>                 URL: https://issues.apache.org/jira/browse/BEAM-4775
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>              Labels: triaged
>          Time Spent: 14h 40m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto]
>  currently doesn't appear to have a way for JobService to return metrics to a 
> user, even though 
> [https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto]
>  includes support for reporting SDK metrics to the runner harness.
>  
> Metrics are apparently necessary to run any ValidatesRunner tests because 
> PAssert needs to validate that the assertions succeeded. However, this 
> statement should be double-checked: perhaps it's possible to somehow work 
> with PAssert without metrics support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to