dianfu commented on a change in pull request #11543: [FLINK-16672][python] 
Support Counter, Gauge, Meter, Distribution metric type for Python UDF
URL: https://github.com/apache/flink/pull/11543#discussion_r399318521
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
 ##########
 @@ -19,17 +19,267 @@
 package org.apache.flink.python.metric;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Helper class for forwarding Python metrics to Java accumulators and metrics.
  */
 @Internal
-public class FlinkMetricContainer {
+public final class FlinkMetricContainer {
+
+       private static final String METRIC_KEY_SEPARATOR =
+               
GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER);
 
+       private final MetricsContainerStepMap metricsContainers;
        private final MetricGroup baseMetricGroup;
+       private final Map<String, Counter> flinkCounterCache;
+       private final Map<String, Meter> flinkMeterCache;
+       private final Map<String, FlinkDistributionGauge> 
flinkDistributionGaugeCache;
+       private final Map<String, FlinkGauge> flinkGaugeCache;
 
        public FlinkMetricContainer(MetricGroup metricGroup) {
-               this.baseMetricGroup = metricGroup;
+               this.baseMetricGroup = checkNotNull(metricGroup);
+               this.flinkCounterCache = new HashMap<>();
+               this.flinkMeterCache = new HashMap<>();
+               this.flinkDistributionGaugeCache = new HashMap<>();
+               this.flinkGaugeCache = new HashMap<>();
+               this.metricsContainers = new MetricsContainerStepMap();
+       }
+
+       private MetricsContainerImpl getMetricsContainer(String stepName) {
+               return metricsContainers.getContainer(stepName);
+       }
+
+       /**
+        * Update this container with metrics from the passed {@link 
MonitoringInfo}s, and send updates
+        * along to Flink's internal metrics framework.
+        */
+       public void updateMetrics(String stepName, List<MonitoringInfo> 
monitoringInfos) {
+               getMetricsContainer(stepName).update(monitoringInfos);
+               updateMetrics(stepName);
+       }
+
+       /**
+        * Update Flink's internal metrics ({@link this#flinkCounterCache}) 
with the latest metrics for
+        * a given step.
+        */
+       private void updateMetrics(String stepName) {
+               MetricResults metricResults = 
asAttemptedOnlyMetricResults(metricsContainers);
+               MetricQueryResults metricQueryResults =
+                       
metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build());
+               updateCounterOrMeter(metricQueryResults.getCounters());
+               updateDistributions(metricQueryResults.getDistributions());
+               updateGauge(metricQueryResults.getGauges());
+       }
+
+       private boolean isUserMetric(MetricResult metricResult) {
+               MetricName metricName = metricResult.getKey().metricName();
+               return (metricName instanceof MonitoringInfoMetricName) &&
+                       ((MonitoringInfoMetricName) metricName).getUrn()
+                               
.contains(MonitoringInfoConstants.Urns.USER_COUNTER);
+       }
+
+       private void updateCounterOrMeter(Iterable<MetricResult<Long>> 
counters) {
+               for (MetricResult<Long> metricResult : counters) {
+                       if (!isUserMetric(metricResult)) {
+                               continue;
+                       }
+                       // get identifier
+                       String flinkMetricIdentifier = 
getFlinkMetricIdentifierString(metricResult.getKey());
+
+                       // get metric type
+                       ArrayList<String> scopeComponents = 
getNameSpaceArray(metricResult.getKey());
+                       if ((scopeComponents.size() % 2) != 0) {
+                               Meter meter;
+                               if 
(flinkMeterCache.containsKey(flinkMetricIdentifier)) {
+                                       meter = 
flinkMeterCache.get(flinkMetricIdentifier);
+                               } else {
+                                       int timeSpanInSeconds =
+                                               
Integer.valueOf(scopeComponents.get(scopeComponents.size() - 1));
+                                       MetricGroup metricGroup =
+                                               
registerMetricGroup(metricResult.getKey(), baseMetricGroup);
+                                       meter = metricGroup.meter(
+                                               
metricResult.getKey().metricName().getName(),
+                                               new 
MeterView(timeSpanInSeconds));
+                                       
flinkMeterCache.put(flinkMetricIdentifier, meter);
+                               }
+
+                               Long update = metricResult.getAttempted();
+                               meter.markEvent(update);
+                       } else {
+                               Counter counter;
+                               if 
(flinkCounterCache.containsKey(flinkMetricIdentifier)) {
+                                       counter = 
flinkCounterCache.get(flinkMetricIdentifier);
+                               } else {
+                                       MetricGroup metricGroup =
+                                               
registerMetricGroup(metricResult.getKey(), baseMetricGroup);
+                                       counter = 
metricGroup.counter(metricResult.getKey().metricName().getName());
+                                       
flinkCounterCache.put(flinkMetricIdentifier, counter);
+                               }
+
+                               Long update = metricResult.getAttempted();
+                               counter.inc(update - counter.getCount());
+                       }
+               }
+       }
+
+       private void 
updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) {
+               for (MetricResult<DistributionResult> metricResult : 
distributions) {
+                       if (!isUserMetric(metricResult)) {
+                               continue;
+                       }
+                       // get identifier
+                       String flinkMetricIdentifier = 
getFlinkMetricIdentifierString(metricResult.getKey());
+                       DistributionResult update = metricResult.getAttempted();
+
+                       // update flink metric
+                       FlinkDistributionGauge gauge = 
flinkDistributionGaugeCache.get(flinkMetricIdentifier);
+                       if (gauge == null) {
+                               MetricGroup metricGroup =
+                                       
registerMetricGroup(metricResult.getKey(), baseMetricGroup);
+                               gauge = metricGroup.gauge(
+                                       
metricResult.getKey().metricName().getName(),
+                                       new FlinkDistributionGauge(update));
+                               
flinkDistributionGaugeCache.put(flinkMetricIdentifier, gauge);
+                       } else {
+                               gauge.update(update);
+                       }
+               }
+       }
+
+       private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
+               for (MetricResult<GaugeResult> metricResult : gauges) {
+                       if (!isUserMetric(metricResult)) {
+                               continue;
+                       }
+                       // get identifier
+                       String flinkMetricIdentifier = 
getFlinkMetricIdentifierString(metricResult.getKey());
+
+                       GaugeResult update = metricResult.getAttempted();
+
+                       // update flink metric
+                       FlinkGauge gauge = 
flinkGaugeCache.get(flinkMetricIdentifier);
+                       if (gauge == null) {
+                               MetricGroup metricGroup = 
registerMetricGroup(metricResult.getKey(), baseMetricGroup);
+                               gauge = metricGroup.gauge(
+                                       
metricResult.getKey().metricName().getName(),
+                                       new FlinkGauge(update));
+                               flinkGaugeCache.put(flinkMetricIdentifier, 
gauge);
+                       } else {
+                               gauge.update(update);
+                       }
+               }
+       }
+
+       @VisibleForTesting
+       static ArrayList getNameSpaceArray(MetricKey metricKey) {
+               MetricName metricName = metricKey.metricName();
+               try {
+                       return new 
ObjectMapper().readValue(metricName.getNamespace(), ArrayList.class);
+               } catch (JsonProcessingException e) {
+                       e.printStackTrace();
 
 Review comment:
   remove this line and set it in the following RuntimeException?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to