This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5c31efb [BEAM-6865] Move MetricsApi updates from flink.metrics to
core.metrics
new 65dbb4d Merge pull request #8176 from ibzib/update-metrics
5c31efb is described below
commit 5c31efbae5fa40c024a47a44af3e6bc8a079cc2a
Author: Kyle Weaver <[email protected]>
AuthorDate: Fri Mar 29 18:02:01 2019 -0700
[BEAM-6865] Move MetricsApi updates from flink.metrics to core.metrics
---
.../runners/core/metrics/MetricsContainerImpl.java | 47 ++++++++++++++++++++++
.../flink/metrics/FlinkMetricContainer.java | 46 +--------------------
2 files changed, 49 insertions(+), 44 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 630f619..bf5acfe 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.core.metrics;
+import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn;
import static
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
@@ -25,15 +26,23 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metric;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Holds the metrics for a single step and uses metric cells that allow
extracting the cumulative
@@ -50,6 +59,7 @@ import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableLis
*/
@Experimental(Kind.METRICS)
public class MetricsContainerImpl implements Serializable, MetricsContainer {
+ private static final Logger LOG =
LoggerFactory.getLogger(MetricsContainerImpl.class);
@Nullable private final String stepName;
@@ -231,6 +241,43 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
updateGauges(gauges, other.gauges);
}
+ /** Update values of this {@link MetricsContainerImpl} by reading from
{@code monitoringInfos}. */
+ public void update(Iterable<MonitoringInfo> monitoringInfos) {
+ monitoringInfos.forEach(
+ monitoringInfo -> {
+ if (monitoringInfo.hasMetric()) {
+ String urn = monitoringInfo.getUrn();
+ MetricName metricName = parseUrn(urn);
+ org.apache.beam.model.pipeline.v1.MetricsApi.Metric metric =
monitoringInfo.getMetric();
+ if (metric.hasCounterData()) {
+ CounterData counterData = metric.getCounterData();
+ if (counterData.getValueCase() ==
CounterData.ValueCase.INT64_VALUE) {
+ Counter counter = getCounter(metricName);
+ counter.inc(counterData.getInt64Value());
+ } else {
+ LOG.warn("Unsupported CounterData type: {}", counterData);
+ }
+ } else if (metric.hasDistributionData()) {
+ DistributionData distributionData = metric.getDistributionData();
+ if (distributionData.hasIntDistributionData()) {
+ Distribution distribution = getDistribution(metricName);
+ IntDistributionData intDistributionData =
distributionData.getIntDistributionData();
+ distribution.update(
+ intDistributionData.getSum(),
+ intDistributionData.getCount(),
+ intDistributionData.getMin(),
+ intDistributionData.getMax());
+ } else {
+ LOG.warn("Unsupported DistributionData type: {}",
distributionData);
+ }
+ } else if (metric.hasExtremaData()) {
+ ExtremaData extremaData = metric.getExtremaData();
+ LOG.warn("Extrema metric unsupported: {}", extremaData);
+ }
+ }
+ });
+ }
+
private void updateCounters(
MetricsMap<MetricName, CounterCell> current, MetricsMap<MetricName,
CounterCell> updates) {
for (Map.Entry<MetricName, CounterCell> counter : updates.entries()) {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index 87c9afd..8a1781d 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -17,21 +17,14 @@
*/
package org.apache.beam.runners.flink.metrics;
-import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn;
import static
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
@@ -39,7 +32,6 @@ import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsFilter;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -89,7 +81,7 @@ public class FlinkMetricContainer {
this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator;
}
- public MetricsContainer getMetricsContainer(String stepName) {
+ public MetricsContainerImpl getMetricsContainer(String stepName) {
return metricsAccumulator != null
? metricsAccumulator.getLocalValue().getContainer(stepName)
: null;
@@ -100,41 +92,7 @@ public class FlinkMetricContainer {
* along to Flink's internal metrics framework.
*/
public void updateMetrics(String stepName, List<MonitoringInfo>
monitoringInfos) {
- MetricsContainer metricsContainer = getMetricsContainer(stepName);
- monitoringInfos.forEach(
- monitoringInfo -> {
- if (monitoringInfo.hasMetric()) {
- String urn = monitoringInfo.getUrn();
- MetricName metricName = parseUrn(urn);
- Metric metric = monitoringInfo.getMetric();
- if (metric.hasCounterData()) {
- CounterData counterData = metric.getCounterData();
- if (counterData.getValueCase() ==
CounterData.ValueCase.INT64_VALUE) {
- org.apache.beam.sdk.metrics.Counter counter =
- metricsContainer.getCounter(metricName);
- counter.inc(counterData.getInt64Value());
- } else {
- LOG.warn("Unsupported CounterData type: {}", counterData);
- }
- } else if (metric.hasDistributionData()) {
- DistributionData distributionData = metric.getDistributionData();
- if (distributionData.hasIntDistributionData()) {
- Distribution distribution =
metricsContainer.getDistribution(metricName);
- IntDistributionData intDistributionData =
distributionData.getIntDistributionData();
- distribution.update(
- intDistributionData.getSum(),
- intDistributionData.getCount(),
- intDistributionData.getMin(),
- intDistributionData.getMax());
- } else {
- LOG.warn("Unsupported DistributionData type: {}",
distributionData);
- }
- } else if (metric.hasExtremaData()) {
- ExtremaData extremaData = metric.getExtremaData();
- LOG.warn("Extrema metric unsupported: {}", extremaData);
- }
- }
- });
+ getMetricsContainer(stepName).update(monitoringInfos);
updateMetrics(stepName);
}