[
https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=209822&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-209822
]
ASF GitHub Bot logged work on BEAM-6165:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Mar/19 22:50
Start Date: 07/Mar/19 22:50
Worklog Time Spent: 10m
Work Description: ryan-williams commented on pull request #7971:
[BEAM-6165] Flink portable metrics: get ptransform from MonitoringInfo, not
stage name
URL: https://github.com/apache/beam/pull/7971#discussion_r263590292
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
##########
@@ -99,117 +103,145 @@ public MetricsContainer getMetricsContainer(String
stepName) {
* Update this container with metrics from the passed {@link
MonitoringInfo}s, and send updates
* along to Flink's internal metrics framework.
*/
- public void updateMetrics(String stepName, List<MonitoringInfo>
monitoringInfos) {
- MetricsContainer metricsContainer = getMetricsContainer(stepName);
+ public void updateMetrics(List<MonitoringInfo> monitoringInfos) {
monitoringInfos.forEach(
monitoringInfo -> {
- if (monitoringInfo.hasMetric()) {
- String urn = monitoringInfo.getUrn();
- MetricName metricName = parseUrn(urn);
- Metric metric = monitoringInfo.getMetric();
- if (metric.hasCounterData()) {
- CounterData counterData = metric.getCounterData();
- if (counterData.getValueCase() ==
CounterData.ValueCase.INT64_VALUE) {
- org.apache.beam.sdk.metrics.Counter counter =
- metricsContainer.getCounter(metricName);
- counter.inc(counterData.getInt64Value());
- } else {
- LOG.warn("Unsupported CounterData type: {}", counterData);
- }
- } else if (metric.hasDistributionData()) {
- DistributionData distributionData = metric.getDistributionData();
- if (distributionData.hasIntDistributionData()) {
- Distribution distribution =
metricsContainer.getDistribution(metricName);
- IntDistributionData intDistributionData =
distributionData.getIntDistributionData();
- distribution.update(
- intDistributionData.getSum(),
- intDistributionData.getCount(),
- intDistributionData.getMin(),
- intDistributionData.getMax());
- } else {
- LOG.warn("Unsupported DistributionData type: {}",
distributionData);
- }
- } else if (metric.hasExtremaData()) {
- ExtremaData extremaData = metric.getExtremaData();
- LOG.warn("Extrema metric unsupported: {}", extremaData);
+ if (!monitoringInfo.hasMetric()) {
+ LOG.info("Skipping metric-less MonitoringInfo: {}",
monitoringInfo);
+ return;
+ }
+ Metric metric = monitoringInfo.getMetric();
+
+ String urn = monitoringInfo.getUrn();
+ MetricName metricName = parseUserUrn(urn);
+ if (metricName == null) {
+ LOG.info("Dropping system metric: {}", monitoringInfo);
+ return;
+ }
+
+ Map<String, String> labels = monitoringInfo.getLabelsMap();
+ String ptransform = labels.get(PTRANSFORM_LABEL);
+
+ MetricsContainer metricsContainer = getMetricsContainer(ptransform);
+
+ MetricKey key = MetricKey.create(ptransform, metricName);
+
+ if (metric.hasCounterData()) {
+ CounterData counterData = metric.getCounterData();
+ if (counterData.getValueCase() ==
CounterData.ValueCase.INT64_VALUE) {
+ long value = counterData.getInt64Value();
+ org.apache.beam.sdk.metrics.Counter counter =
metricsContainer.getCounter(metricName);
+ counter.inc(value);
+
+ // Update flink
+ updateCounter(key, value);
+ } else {
+ LOG.warn("Unsupported CounterData type: {}", counterData);
+ }
+ } else if (metric.hasDistributionData()) {
+ DistributionData distributionData = metric.getDistributionData();
+ if (distributionData.hasIntDistributionData()) {
+ Distribution distribution =
metricsContainer.getDistribution(metricName);
+ IntDistributionData intDistributionData =
distributionData.getIntDistributionData();
+ distribution.update(
+ intDistributionData.getSum(),
+ intDistributionData.getCount(),
+ intDistributionData.getMin(),
+ intDistributionData.getMax());
+
+ // Update flink
+ updateDistribution(
+ key,
+ DistributionResult.create(
+ intDistributionData.getSum(),
+ intDistributionData.getCount(),
+ intDistributionData.getMin(),
+ intDistributionData.getMax()));
+ } else {
+ LOG.warn("Unsupported DistributionData type: {}",
distributionData);
}
+ } else if (metric.hasExtremaData()) {
+ ExtremaData extremaData = metric.getExtremaData();
+ LOG.warn("Extrema metric unsupported: {}", extremaData);
}
});
- updateMetrics(stepName);
}
/**
* Update Flink's internal metrics ({@link this#flinkCounterCache}) with the
latest metrics for a
* given step.
*/
- void updateMetrics(String stepName) {
+ void updateFlinkMetrics(String stepName) {
MetricResults metricResults =
asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
MetricQueryResults metricQueryResults =
metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build());
- updateCounters(metricQueryResults.getCounters());
- updateDistributions(metricQueryResults.getDistributions());
- updateGauge(metricQueryResults.getGauges());
- }
-
- private void updateCounters(Iterable<MetricResult<Long>> counters) {
- for (MetricResult<Long> metricResult : counters) {
- String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());
- Long update = metricResult.getAttempted();
+ updateMetrics(metricQueryResults.getCounters(), this::updateCounter);
+ updateMetrics(metricQueryResults.getDistributions(),
this::updateDistribution);
+ updateMetrics(metricQueryResults.getGauges(), this::updateGauge);
+ }
- // update flink metric
- Counter counter =
- flinkCounterCache.computeIfAbsent(
- flinkMetricName, n ->
runtimeContext.getMetricGroup().counter(n));
- counter.dec(counter.getCount());
- counter.inc(update);
+ private <T> void updateMetrics(
+ Iterable<MetricResult<T>> metricResults, BiConsumer<MetricKey, T> fn) {
+ for (MetricResult<T> metricResult : metricResults) {
+ fn.accept(metricResult.getKey(), metricResult.getAttempted());
}
}
- private void updateDistributions(Iterable<MetricResult<DistributionResult>>
distributions) {
- for (MetricResult<DistributionResult> metricResult : distributions) {
- String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());
-
- DistributionResult update = metricResult.getAttempted();
-
- // update flink metric
- FlinkDistributionGauge gauge =
flinkDistributionGaugeCache.get(flinkMetricName);
- if (gauge == null) {
- gauge =
- runtimeContext
- .getMetricGroup()
- .gauge(flinkMetricName, new FlinkDistributionGauge(update));
- flinkDistributionGaugeCache.put(flinkMetricName, gauge);
- } else {
- gauge.update(update);
- }
+ private <T, FlinkT> void updateMetric(
+ MetricKey key,
+ Map<String, FlinkT> flinkMetricMap,
+ TriFunction<String, MetricGroup, T, FlinkT> create,
+ BiConsumer<FlinkT, T> update,
+ T value) {
+ String flinkMetricName = getFlinkMetricNameString(key);
+
+ // update flink metric
+ FlinkT metric = flinkMetricMap.get(flinkMetricName);
+ if (metric == null) {
+ metric = create.apply(flinkMetricName, runtimeContext.getMetricGroup(),
value);
+ flinkMetricMap.put(flinkMetricName, metric);
}
+ update.accept(metric, value);
}
- private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
- for (MetricResult<GaugeResult> metricResult : gauges) {
- String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());
+ private void updateCounter(MetricKey metricKey, long attempted) {
+ updateMetric(
+ metricKey,
+ flinkCounterCache,
+ (name, group, value) -> group.counter(name),
+ (counter, value) -> {
+ counter.dec(counter.getCount());
+ counter.inc(value);
+ },
+ attempted);
+ }
- GaugeResult update = metricResult.getAttempted();
+ private void updateDistribution(MetricKey metricKey, DistributionResult
attempted) {
+ updateMetric(
+ metricKey,
+ flinkDistributionGaugeCache,
+ (name, group, value) -> group.gauge(name, new
FlinkDistributionGauge(value)),
+ FlinkDistributionGauge::update,
+ attempted);
+ }
- // update flink metric
- FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
- if (gauge == null) {
- gauge = runtimeContext.getMetricGroup().gauge(flinkMetricName, new
FlinkGauge(update));
- flinkGaugeCache.put(flinkMetricName, gauge);
- } else {
- gauge.update(update);
- }
- }
+ private void updateGauge(MetricKey metricKey, GaugeResult attempted) {
+ updateMetric(
+ metricKey,
+ flinkGaugeCache,
+ (name, group, value) -> group.gauge(name, new FlinkGauge(value)),
+ FlinkGauge::update,
+ attempted);
}
- @VisibleForTesting
static String getFlinkMetricNameString(MetricKey metricKey) {
MetricName metricName = metricKey.metricName();
- // We use only the MetricName here, the step name is already contained
- // in the operator name which is passed to Flink's MetricGroup to which
- // the metric with the following name will be added.
- return metricName.getNamespace() + METRIC_KEY_SEPARATOR +
metricName.getName();
+ return String.join(
Review comment:
I've not. I'll do that and report back.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 209822)
> Send metrics to Flink in portable Flink runner
> ----------------------------------------------
>
> Key: BEAM-6165
> URL: https://issues.apache.org/jira/browse/BEAM-6165
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Affects Versions: 2.8.0
> Reporter: Ryan Williams
> Assignee: Ryan Williams
> Priority: Major
> Labels: metrics, portability, portability-flink
> Fix For: 2.10.0
>
> Time Spent: 7h 50m
> Remaining Estimate: 0h
>
> Metrics are sent from the fn harness to runnerĀ in the Python SDK (and likely
> Java soon), but the portable Flink runner doesn't pass them on to Flink,
> which it should, so that users can see them in e.g. the Flink UI or via any
> Flink metrics reporters.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)