Cherry-pick pull request #2649 into release-2.0.0 branch
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3a4ffd2c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3a4ffd2c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3a4ffd2c Branch: refs/heads/release-2.0.0 Commit: 3a4ffd2ce8e90486cf51f420a42599ddf95b9a5d Parents: bad377c Author: Aviem Zur <[email protected]> Authored: Fri May 5 23:13:24 2017 +0300 Committer: Dan Halperin <[email protected]> Committed: Tue May 9 09:36:18 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/core/LateDataUtils.java | 2 +- .../beam/runners/flink/FlinkRunnerResult.java | 8 +- .../metrics/DoFnRunnerWithMetricsUpdate.java | 12 +- .../flink/metrics/FlinkMetricContainer.java | 273 +++-------- .../flink/metrics/FlinkMetricResults.java | 146 ------ .../flink/metrics/MetricsAccumulator.java | 60 +++ .../flink/metrics/ReaderInvocationUtil.java | 7 +- .../translation/wrappers/SourceInputFormat.java | 8 +- .../streaming/io/BoundedSourceWrapper.java | 8 +- .../streaming/io/UnboundedSourceWrapper.java | 9 +- .../beam/runners/spark/SparkPipelineResult.java | 8 +- .../apache/beam/runners/spark/io/SourceRDD.java | 4 +- .../runners/spark/io/SparkUnboundedSource.java | 19 +- .../spark/metrics/MetricsAccumulator.java | 20 +- .../spark/metrics/MetricsAccumulatorParam.java | 20 +- .../runners/spark/metrics/SparkBeamMetric.java | 11 +- .../spark/metrics/SparkBeamMetricSource.java | 2 +- .../spark/metrics/SparkMetricResults.java | 172 ------- .../spark/metrics/SparkMetricsContainer.java | 174 ------- .../SparkGroupAlsoByWindowViaWindowSet.java | 4 +- .../spark/stateful/StateSpecFunctions.java | 8 +- .../translation/DoFnRunnerWithMetrics.java | 6 +- .../spark/translation/MultiDoFnFunction.java | 6 +- .../spark/translation/TransformTranslator.java | 4 +- .../streaming/StreamingTransformTranslator.java | 4 +- .../apache/beam/sdk/metrics/CounterCell.java | 27 +- .../org/apache/beam/sdk/metrics/DirtyState.java | 3 +- .../beam/sdk/metrics/DistributionCell.java | 16 +- .../org/apache/beam/sdk/metrics/GaugeCell.java | 20 +- .../org/apache/beam/sdk/metrics/MetricCell.java | 14 +- .../org/apache/beam/sdk/metrics/Metrics.java | 2 +- .../beam/sdk/metrics/MetricsContainer.java | 29 +- .../sdk/metrics/MetricsContainerStepMap.java | 487 +++++++++++++++++++ .../org/apache/beam/sdk/metrics/MetricsMap.java | 5 +- .../beam/sdk/metrics/CounterCellTest.java | 6 +- .../metrics/MetricsContainerStepMapTest.java | 258 ++++++++++ .../beam/sdk/metrics/MetricsContainerTest.java | 14 +- 37 files changed, 1086 insertions(+), 790 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java index c45387b..f7c0d31 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java @@ -71,7 +71,7 @@ public class LateDataUtils { .isBefore(timerInternals.currentInputWatermarkTime()); if (expired) { // The element is too late for this window. - droppedDueToLateness.inc(); + droppedDueToLateness.update(1L); WindowTracing.debug( "GroupAlsoByWindow: Dropping element at {} for key: {}; " + "window: {} since it is too far behind inputWatermark: {}", http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 90dc79b..038895a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -17,12 +17,15 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + import java.io.IOException; import java.util.Collections; import java.util.Map; -import org.apache.beam.runners.flink.metrics.FlinkMetricResults; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.joda.time.Duration; /** @@ -72,6 +75,7 @@ public class FlinkRunnerResult implements PipelineResult { @Override public MetricResults metrics() { - return new FlinkMetricResults(accumulators); + return asAttemptedOnlyMetricResults( + (MetricsContainerStepMap) accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index dae91fe..40191d2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -34,6 +34,7 @@ import org.joda.time.Instant; */ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + private final String stepName; private final FlinkMetricContainer container; private final DoFnRunner<InputT, OutputT> delegate; @@ -41,14 +42,15 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner< String stepName, DoFnRunner<InputT, OutputT> delegate, RuntimeContext runtimeContext) { + this.stepName = stepName; this.delegate = delegate; - container = new FlinkMetricContainer(stepName, runtimeContext); + container = new FlinkMetricContainer(runtimeContext); } @Override public void startBundle() { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.startBundle(); } catch (IOException e) { throw new RuntimeException(e); @@ -58,7 +60,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner< @Override public void processElement(final WindowedValue<InputT> elem) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.processElement(elem); } catch (IOException e) { throw new RuntimeException(e); @@ -69,7 +71,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner< public void onTimer(final String timerId, final BoundedWindow window, final Instant timestamp, final TimeDomain timeDomain) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.onTimer(timerId, window, timestamp, timeDomain); } catch (IOException e) { throw new RuntimeException(e); @@ -79,7 +81,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner< @Override public void finishBundle() { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.finishBundle(); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java index d020f69..f81205e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -17,19 +17,24 @@ */ package org.apache.beam.runners.flink.metrics; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + import java.util.HashMap; import java.util.Map; -import org.apache.beam.sdk.metrics.DistributionData; -import org.apache.beam.sdk.metrics.GaugeData; -import org.apache.beam.sdk.metrics.MetricKey; -import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Helper class for holding a {@link MetricsContainer} and forwarding Beam metrics to @@ -37,46 +42,61 @@ import org.apache.flink.metrics.Gauge; */ public class FlinkMetricContainer { + public static final String ACCUMULATOR_NAME = "__metricscontainers"; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkMetricContainer.class); + private static final String METRIC_KEY_SEPARATOR = "__"; - static final String COUNTER_PREFIX = "__counter"; - static final String DISTRIBUTION_PREFIX = "__distribution"; - static final String GAUGE_PREFIX = "__gauge"; + private static final String COUNTER_PREFIX = "__counter"; + private static final String DISTRIBUTION_PREFIX = "__distribution"; + private static final String GAUGE_PREFIX = "__gauge"; - private final MetricsContainer metricsContainer; private final RuntimeContext runtimeContext; private final Map<String, Counter> flinkCounterCache; private final Map<String, FlinkDistributionGauge> flinkDistributionGaugeCache; private final Map<String, FlinkGauge> flinkGaugeCache; + private final MetricsAccumulator metricsAccumulator; - public FlinkMetricContainer(String stepName, RuntimeContext runtimeContext) { - metricsContainer = new MetricsContainer(stepName); + public FlinkMetricContainer(RuntimeContext runtimeContext) { this.runtimeContext = runtimeContext; - flinkCounterCache = new HashMap<>(); - flinkDistributionGaugeCache = new HashMap<>(); - flinkGaugeCache = new HashMap<>(); + this.flinkCounterCache = new HashMap<>(); + this.flinkDistributionGaugeCache = new HashMap<>(); + this.flinkGaugeCache = new HashMap<>(); + + Accumulator<MetricsContainerStepMap, MetricsContainerStepMap> metricsAccumulator = + runtimeContext.getAccumulator(ACCUMULATOR_NAME); + if (metricsAccumulator == null) { + metricsAccumulator = new MetricsAccumulator(); + try { + runtimeContext.addAccumulator(ACCUMULATOR_NAME, metricsAccumulator); + } catch (Exception e) { + LOG.error("Failed to create metrics accumulator.", e); + } + } + this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator; } - public MetricsContainer getMetricsContainer() { - return metricsContainer; + MetricsContainer getMetricsContainer(String stepName) { + return metricsAccumulator != null + ? metricsAccumulator.getLocalValue().getContainer(stepName) + : null; } - public void updateMetrics() { - // update metrics - MetricUpdates updates = metricsContainer.getUpdates(); - if (updates != null) { - updateCounters(updates.counterUpdates()); - updateDistributions(updates.distributionUpdates()); - updateGauge(updates.gaugeUpdates()); - metricsContainer.commitUpdates(); - } + void updateMetrics() { + MetricResults metricResults = + asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue()); + MetricQueryResults metricQueryResults = + metricResults.queryMetrics(MetricsFilter.builder().build()); + updateCounters(metricQueryResults.counters()); + updateDistributions(metricQueryResults.distributions()); + updateGauge(metricQueryResults.gauges()); } - private void updateCounters(Iterable<MetricUpdates.MetricUpdate<Long>> updates) { - - for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates) { + private void updateCounters(Iterable<MetricResult<Long>> counters) { + for (MetricResult<Long> metricResult : counters) { + String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricResult); - String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricUpdate.getKey()); - Long update = metricUpdate.getUpdate(); + Long update = metricResult.attempted(); // update flink metric Counter counter = flinkCounterCache.get(flinkMetricName); @@ -86,26 +106,15 @@ public class FlinkMetricContainer { } counter.dec(counter.getCount()); counter.inc(update); - - // update flink accumulator - Accumulator<Long, Long> accumulator = runtimeContext.getAccumulator(flinkMetricName); - if (accumulator == null) { - accumulator = new LongCounter(update); - runtimeContext.addAccumulator(flinkMetricName, accumulator); - } else { - accumulator.resetLocal(); - accumulator.add(update); - } } } - private void updateDistributions(Iterable<MetricUpdates.MetricUpdate<DistributionData>> updates) { - - for (MetricUpdates.MetricUpdate<DistributionData> metricUpdate : updates) { - + private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) { + for (MetricResult<DistributionResult> metricResult : distributions) { String flinkMetricName = - getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricUpdate.getKey()); - DistributionData update = metricUpdate.getUpdate(); + getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricResult); + + DistributionResult update = metricResult.attempted(); // update flink metric FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName); @@ -116,26 +125,15 @@ public class FlinkMetricContainer { } else { gauge.update(update); } - - // update flink accumulator - Accumulator<DistributionData, DistributionData> accumulator = - runtimeContext.getAccumulator(flinkMetricName); - if (accumulator == null) { - accumulator = new FlinkDistributionDataAccumulator(update); - runtimeContext.addAccumulator(flinkMetricName, accumulator); - } else { - accumulator.resetLocal(); - accumulator.add(update); - } } } - private void updateGauge(Iterable<MetricUpdates.MetricUpdate<GaugeData>> updates) { - for (MetricUpdates.MetricUpdate<GaugeData> metricUpdate : updates) { - + private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) { + for (MetricResult<GaugeResult> metricResult : gauges) { String flinkMetricName = - getFlinkMetricNameString(GAUGE_PREFIX, metricUpdate.getKey()); - GaugeData update = metricUpdate.getUpdate(); + getFlinkMetricNameString(GAUGE_PREFIX, metricResult); + + GaugeResult update = metricResult.attempted(); // update flink metric FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName); @@ -146,170 +144,55 @@ public class FlinkMetricContainer { } else { gauge.update(update); } - - // update flink accumulator - Accumulator<GaugeData, GaugeData> accumulator = - runtimeContext.getAccumulator(flinkMetricName); - if (accumulator == null) { - accumulator = new FlinkGaugeAccumulator(update); - runtimeContext.addAccumulator(flinkMetricName, accumulator); - } - accumulator.resetLocal(); - accumulator.add(update); } } - private static String getFlinkMetricNameString(String prefix, MetricKey key) { + private static String getFlinkMetricNameString(String prefix, MetricResult<?> metricResult) { return prefix - + METRIC_KEY_SEPARATOR + key.stepName() - + METRIC_KEY_SEPARATOR + key.metricName().namespace() - + METRIC_KEY_SEPARATOR + key.metricName().name(); - } - - static MetricKey parseMetricKey(String flinkMetricName) { - String[] arr = flinkMetricName.split(METRIC_KEY_SEPARATOR); - return MetricKey.create(arr[2], MetricName.named(arr[3], arr[4])); + + METRIC_KEY_SEPARATOR + metricResult.step() + + METRIC_KEY_SEPARATOR + metricResult.name().namespace() + + METRIC_KEY_SEPARATOR + metricResult.name().name(); } /** - * Flink {@link Gauge} for {@link DistributionData}. + * Flink {@link Gauge} for {@link DistributionResult}. */ - public static class FlinkDistributionGauge implements Gauge<DistributionData> { + public static class FlinkDistributionGauge implements Gauge<DistributionResult> { - DistributionData data; + DistributionResult data; - FlinkDistributionGauge(DistributionData data) { + FlinkDistributionGauge(DistributionResult data) { this.data = data; } - void update(DistributionData data) { + void update(DistributionResult data) { this.data = data; } @Override - public DistributionData getValue() { + public DistributionResult getValue() { return data; } } /** - * Flink {@link Gauge} for {@link GaugeData}. + * Flink {@link Gauge} for {@link GaugeResult}. */ - public static class FlinkGauge implements Gauge<GaugeData> { + public static class FlinkGauge implements Gauge<GaugeResult> { - GaugeData data; + GaugeResult data; - FlinkGauge(GaugeData data) { + FlinkGauge(GaugeResult data) { this.data = data; } - void update(GaugeData update) { - this.data = data.combine(update); + void update(GaugeResult update) { + this.data = update; } @Override - public GaugeData getValue() { + public GaugeResult getValue() { return data; } } - - /** - * Flink {@link Accumulator} for {@link GaugeData}. - */ - public static class FlinkDistributionDataAccumulator implements - Accumulator<DistributionData, DistributionData> { - - private static final long serialVersionUID = 1L; - - private DistributionData data; - - public FlinkDistributionDataAccumulator(DistributionData data) { - this.data = data; - } - - @Override - public void add(DistributionData value) { - if (data == null) { - this.data = value; - } else { - this.data = this.data.combine(value); - } - } - - @Override - public DistributionData getLocalValue() { - return data; - } - - @Override - public void resetLocal() { - data = null; - } - - @Override - public void merge(Accumulator<DistributionData, DistributionData> other) { - data = data.combine(other.getLocalValue()); - } - - @Override - public Accumulator<DistributionData, DistributionData> clone() { - try { - super.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } - - return new FlinkDistributionDataAccumulator( - DistributionData.create(data.sum(), data.count(), data.min(), data.max())); - } - } - - /** - * Flink {@link Accumulator} for {@link GaugeData}. - */ - public static class FlinkGaugeAccumulator implements Accumulator<GaugeData, GaugeData> { - - private GaugeData data; - - public FlinkGaugeAccumulator(GaugeData data) { - this.data = data; - } - - @Override - public void add(GaugeData value) { - if (data == null) { - this.data = value; - } else { - this.data = this.data.combine(value); - } - } - - @Override - public GaugeData getLocalValue() { - return data; - } - - @Override - public void resetLocal() { - this.data = null; - } - - @Override - public void merge(Accumulator<GaugeData, GaugeData> other) { - data = data.combine(other.getLocalValue()); - } - - @Override - public Accumulator<GaugeData, GaugeData> clone() { - try { - super.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } - - return new FlinkGaugeAccumulator( - GaugeData.create(data.value())); - } - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java deleted file mode 100644 index 9e1430b..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.metrics; - - -import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.COUNTER_PREFIX; -import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.DISTRIBUTION_PREFIX; -import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.GAUGE_PREFIX; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.metrics.DistributionData; -import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.GaugeData; -import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricFiltering; -import org.apache.beam.sdk.metrics.MetricKey; -import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricsFilter; - -/** - * Implementation of {@link MetricResults} for the Flink Runner. - */ -public class FlinkMetricResults extends MetricResults { - - private Map<String, Object> accumulators; - - public FlinkMetricResults(Map<String, Object> accumulators) { - this.accumulators = accumulators; - } - - @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { - return new FlinkMetricQueryResults(filter); - } - - private class FlinkMetricQueryResults implements MetricQueryResults { - - private MetricsFilter filter; - - FlinkMetricQueryResults(MetricsFilter filter) { - this.filter = filter; - } - - @Override - public Iterable<MetricResult<Long>> counters() { - List<MetricResult<Long>> result = new ArrayList<>(); - for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { - if (accumulator.getKey().startsWith(COUNTER_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); - if (MetricFiltering.matches(filter, metricKey)) { - result.add(new FlinkMetricResult<>( - metricKey.metricName(), metricKey.stepName(), (Long) accumulator.getValue())); - } - } - } - return result; - } - - @Override - public Iterable<MetricResult<DistributionResult>> distributions() { - List<MetricResult<DistributionResult>> result = new ArrayList<>(); - for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { - if (accumulator.getKey().startsWith(DISTRIBUTION_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); - DistributionData data = (DistributionData) accumulator.getValue(); - if (MetricFiltering.matches(filter, metricKey)) { - result.add(new FlinkMetricResult<>( - metricKey.metricName(), metricKey.stepName(), data.extractResult())); - } - } - } - return result; - } - - @Override - public Iterable<MetricResult<GaugeResult>> gauges() { - List<MetricResult<GaugeResult>> result = new ArrayList<>(); - for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { - if (accumulator.getKey().startsWith(GAUGE_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); - GaugeData data = (GaugeData) accumulator.getValue(); - if (MetricFiltering.matches(filter, metricKey)) { - result.add(new FlinkMetricResult<>( - metricKey.metricName(), metricKey.stepName(), data.extractResult())); - } - } - } - return result; - } - - } - - private static class FlinkMetricResult<T> implements MetricResult<T> { - private final MetricName name; - private final String step; - private final T result; - - FlinkMetricResult(MetricName name, String step, T result) { - this.name = name; - this.step = step; - this.result = result; - } - - @Override - public MetricName name() { - return name; - } - - @Override - public String step() { - return step; - } - - @Override - public T committed() { - throw new UnsupportedOperationException("Flink runner does not currently support committed" - + " metrics results. Please use 'attempted' instead."); - } - - @Override - public T attempted() { - return result; - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java new file mode 100644 index 0000000..a9dc2ce --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.SimpleAccumulator; + +/** + * Accumulator of {@link MetricsContainerStepMap}. + */ +public class MetricsAccumulator implements SimpleAccumulator<MetricsContainerStepMap> { + private MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); + + @Override + public void add(MetricsContainerStepMap value) { + metricsContainers.updateAll(value); + } + + @Override + public MetricsContainerStepMap getLocalValue() { + return metricsContainers; + } + + @Override + public void resetLocal() { + this.metricsContainers = new MetricsContainerStepMap(); + } + + @Override + public void merge(Accumulator<MetricsContainerStepMap, MetricsContainerStepMap> other) { + this.add(other.getLocalValue()); + } + + @Override + public Accumulator<MetricsContainerStepMap, MetricsContainerStepMap> clone() { + try { + super.clone(); + } catch (CloneNotSupportedException ignored) { + } + MetricsAccumulator metricsAccumulator = new MetricsAccumulator(); + metricsAccumulator.getLocalValue().updateAll(this.getLocalValue()); + return metricsAccumulator; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java index 38263d9..64738cc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -32,13 +32,16 @@ import org.apache.beam.sdk.options.PipelineOptions; */ public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT>> { + private final String stepName; private final FlinkMetricContainer container; private final Boolean enableMetrics; public ReaderInvocationUtil( + String stepName, PipelineOptions options, FlinkMetricContainer container) { FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class); + this.stepName = stepName; enableMetrics = flinkPipelineOptions.getEnableMetrics(); this.container = container; } @@ -46,7 +49,7 @@ public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT public boolean invokeStart(ReaderT reader) throws IOException { if (enableMetrics) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { boolean result = reader.start(); container.updateMetrics(); return result; @@ -59,7 +62,7 @@ public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT public boolean invokeAdvance(ReaderT reader) throws IOException { if (enableMetrics) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { boolean result = reader.advance(); container.updateMetrics(); return result; http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index f2b81fc..27e6912 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -71,9 +71,13 @@ public class SourceInputFormat<T> @Override public void open(SourceInputSplit<T> sourceInputSplit) throws IOException { - FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext()); + readerInvoker = - new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + new ReaderInvocationUtil<>( + stepName, + serializedOptions.getPipelineOptions(), + metricContainer); reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options); inputAvailable = readerInvoker.invokeStart(reader); http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index a142685..6d75688 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -104,9 +104,13 @@ public class BoundedSourceWrapper<OutputT> numSubtasks, localSources); - FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext()); + ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker = - new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + new ReaderInvocationUtil<>( + stepName, + serializedOptions.getPipelineOptions(), + metricContainer); readers = new ArrayList<>(); // initialize readers from scratch http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index a731e2b..ec21699 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -215,10 +215,13 @@ public class UnboundedSourceWrapper< context = ctx; - FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); - ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker = - new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext()); + ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker = + new ReaderInvocationUtil<>( + stepName, + serializedOptions.getPipelineOptions(), + metricContainer); if (localReaders.size() == 0) { // do nothing, but still look busy ... http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index 3e94a45..3986e33 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -18,13 +18,15 @@ package org.apache.beam.runners.spark; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + import java.io.IOException; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.beam.runners.spark.metrics.SparkMetricResults; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -41,7 +43,6 @@ public abstract class SparkPipelineResult implements PipelineResult { protected final Future pipelineExecution; protected JavaSparkContext javaSparkContext; protected PipelineResult.State state; - private final SparkMetricResults metricResults = new SparkMetricResults(); SparkPipelineResult(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) { this.pipelineExecution = pipelineExecution; @@ -106,7 +107,8 @@ public abstract class SparkPipelineResult implements PipelineResult { @Override public MetricResults metrics() { - return metricResults; + return asAttemptedOnlyMetricResults( + MetricsAccumulator.getInstance().value()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index e294359..71a19e7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -26,12 +26,12 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.Accumulator; @@ -65,7 +65,7 @@ public class SourceRDD { private final SparkRuntimeContext runtimeContext; private final int numPartitions; private final String stepName; - private final Accumulator<SparkMetricsContainer> metricsAccum; + private final Accumulator<MetricsContainerStepMap> metricsAccum; // to satisfy Scala API. private static final scala.collection.immutable.Seq<Dependency<?>> NIL = http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index 0388f6c..2a9de4b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -25,7 +25,6 @@ import java.util.Collections; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.stateful.StateSpecFunctions; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; @@ -37,6 +36,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -190,7 +190,7 @@ public class SparkUnboundedSource { public scala.Option<RDD<BoxedUnit>> compute(Time validTime) { // compute parent. scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime); - final Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance(); + final Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance(); long count = 0; SparkWatermarks sparkWatermark = null; Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE; @@ -211,7 +211,7 @@ public class SparkUnboundedSource { ? partitionHighWatermark : globalHighWatermarkForBatch; // Update metrics reported in the read final Gauge gauge = Metrics.gauge(NAMESPACE, READ_DURATION_MILLIS); - final MetricsContainer container = metadata.getMetricsContainer().getContainer(stepName); + final MetricsContainer container = metadata.getMetricsContainers().getContainer(stepName); try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(container)) { final long readDurationMillis = metadata.getReadDurationMillis(); if (readDurationMillis > maxReadDuration) { @@ -220,7 +220,7 @@ public class SparkUnboundedSource { } catch (IOException e) { throw new RuntimeException(e); } - metricsAccum.value().update(metadata.getMetricsContainer()); + metricsAccum.value().updateAll(metadata.getMetricsContainers()); } sparkWatermark = @@ -260,20 +260,19 @@ public class SparkUnboundedSource { private final Instant lowWatermark; private final Instant highWatermark; private final long readDurationMillis; - private final SparkMetricsContainer metricsContainer; + private final MetricsContainerStepMap metricsContainers; public Metadata( long numRecords, Instant lowWatermark, Instant highWatermark, final long readDurationMillis, - SparkMetricsContainer metricsContainer) { + MetricsContainerStepMap metricsContainer) { this.numRecords = numRecords; this.readDurationMillis = readDurationMillis; - this.metricsContainer = metricsContainer; + this.metricsContainers = metricsContainer; this.lowWatermark = lowWatermark; this.highWatermark = highWatermark; - metricsContainer.materialize(); } long getNumRecords() { @@ -292,8 +291,8 @@ public class SparkUnboundedSource { return readDurationMillis; } - SparkMetricsContainer getMetricsContainer() { - return metricsContainer; + MetricsContainerStepMap getMetricsContainers() { + return metricsContainers; } } http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java index 1153db6..1dcfa2f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.translation.streaming.Checkpoint; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.Accumulator; @@ -44,7 +45,7 @@ public class MetricsAccumulator { private static final String ACCUMULATOR_NAME = "Beam.Metrics"; private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "metrics"; - private static volatile Accumulator<SparkMetricsContainer> instance = null; + private static volatile Accumulator<MetricsContainerStepMap> instance = null; private static volatile FileSystem fileSystem; private static volatile Path checkpointFilePath; @@ -58,11 +59,13 @@ public class MetricsAccumulator { Optional<CheckpointDir> maybeCheckpointDir = opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) : Optional.<CheckpointDir>absent(); - Accumulator<SparkMetricsContainer> accumulator = - jsc.sc().accumulator(new SparkMetricsContainer(), ACCUMULATOR_NAME, + Accumulator<MetricsContainerStepMap> accumulator = + jsc.sc().accumulator( + new MetricsContainerStepMap(), + ACCUMULATOR_NAME, new MetricsAccumulatorParam()); if (maybeCheckpointDir.isPresent()) { - Optional<SparkMetricsContainer> maybeRecoveredValue = + Optional<MetricsContainerStepMap> maybeRecoveredValue = recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get()); if (maybeRecoveredValue.isPresent()) { accumulator.setValue(maybeRecoveredValue.get()); @@ -75,7 +78,7 @@ public class MetricsAccumulator { } } - public static Accumulator<SparkMetricsContainer> getInstance() { + public static Accumulator<MetricsContainerStepMap> getInstance() { if (instance == null) { throw new IllegalStateException("Metrics accumulator has not been instantiated"); } else { @@ -83,14 +86,15 @@ public class MetricsAccumulator { } } - private static Optional<SparkMetricsContainer> recoverValueFromCheckpoint( + private static Optional<MetricsContainerStepMap> recoverValueFromCheckpoint( JavaSparkContext jsc, CheckpointDir checkpointDir) { try { Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); checkpointFilePath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME); fileSystem = checkpointFilePath.getFileSystem(jsc.hadoopConfiguration()); - SparkMetricsContainer recoveredValue = Checkpoint.readObject(fileSystem, checkpointFilePath); + MetricsContainerStepMap recoveredValue = + Checkpoint.readObject(fileSystem, checkpointFilePath); if (recoveredValue != null) { LOG.info("Recovered metrics from checkpoint."); return Optional.of(recoveredValue); @@ -117,7 +121,7 @@ public class MetricsAccumulator { } /** - * Spark Listener which checkpoints {@link SparkMetricsContainer} values for fault-tolerance. + * Spark Listener which checkpoints {@link MetricsContainerStepMap} values for fault-tolerance. */ public static class AccumulatorCheckpointingSparkListener extends JavaStreamingListener { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java index 9948c81..dee4ebc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java @@ -18,25 +18,31 @@ package org.apache.beam.runners.spark.metrics; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.spark.AccumulatorParam; /** * Metrics accumulator param. */ -class MetricsAccumulatorParam implements AccumulatorParam<SparkMetricsContainer> { +class MetricsAccumulatorParam implements AccumulatorParam<MetricsContainerStepMap> { @Override - public SparkMetricsContainer addAccumulator(SparkMetricsContainer c1, SparkMetricsContainer c2) { - return c1.update(c2); + public MetricsContainerStepMap addAccumulator( + MetricsContainerStepMap c1, + MetricsContainerStepMap c2) { + return addInPlace(c1, c2); } @Override - public SparkMetricsContainer addInPlace(SparkMetricsContainer c1, SparkMetricsContainer c2) { - return c1.update(c2); + public MetricsContainerStepMap addInPlace( + MetricsContainerStepMap c1, + MetricsContainerStepMap c2) { + c1.updateAll(c2); + return c1; } @Override - public SparkMetricsContainer zero(SparkMetricsContainer initialValue) { - return new SparkMetricsContainer(); + public MetricsContainerStepMap zero(MetricsContainerStepMap initialValue) { + return new MetricsContainerStepMap(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java index 2d445a9..e4bd598 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.spark.metrics; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + import com.codahale.metrics.Metric; import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; @@ -27,20 +29,23 @@ import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsFilter; /** - * An adapter between the {@link SparkMetricsContainer} and Codahale's {@link Metric} interface. + * An adapter between the {@link MetricsContainerStepMap} and Codahale's {@link Metric} interface. */ class SparkBeamMetric implements Metric { private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9\\._-]"; private static final String ILLEGAL_CHARACTERS_AND_PERIOD = "[^A-Za-z0-9_-]"; - private final SparkMetricResults metricResults = new SparkMetricResults(); - Map<String, ?> renderAll() { Map<String, Object> metrics = new HashMap<>(); + MetricResults metricResults = + asAttemptedOnlyMetricResults( + MetricsAccumulator.getInstance().value()); MetricQueryResults metricQueryResults = metricResults.queryMetrics(MetricsFilter.builder().build()); for (MetricResult<Long> metricResult : metricQueryResults.counters()) { http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java index 5c6fc24..03128d7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java @@ -24,7 +24,7 @@ import org.apache.spark.metrics.source.Source; /** * A Spark {@link Source} that is tailored to expose a {@link SparkBeamMetric}, - * wrapping an underlying {@link SparkMetricsContainer} instance. + * wrapping an underlying {@link org.apache.beam.sdk.metrics.MetricResults} instance. */ public class SparkBeamMetricSource implements Source { private static final String METRIC_NAME = "Metrics"; http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java deleted file mode 100644 index faf4c52..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.metrics; - -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import org.apache.beam.sdk.metrics.DistributionData; -import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.GaugeData; -import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricFiltering; -import org.apache.beam.sdk.metrics.MetricKey; -import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; -import org.apache.beam.sdk.metrics.MetricsFilter; - - -/** - * Implementation of {@link MetricResults} for the Spark Runner. - */ -public class SparkMetricResults extends MetricResults { - - @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { - return new SparkMetricQueryResults(filter); - } - - private static class SparkMetricQueryResults implements MetricQueryResults { - private final MetricsFilter filter; - - SparkMetricQueryResults(MetricsFilter filter) { - this.filter = filter; - } - - @Override - public Iterable<MetricResult<Long>> counters() { - return - FluentIterable - .from(SparkMetricsContainer.getCounters()) - .filter(matchesFilter(filter)) - .transform(TO_COUNTER_RESULT) - .toList(); - } - - @Override - public Iterable<MetricResult<DistributionResult>> distributions() { - return - FluentIterable - .from(SparkMetricsContainer.getDistributions()) - .filter(matchesFilter(filter)) - .transform(TO_DISTRIBUTION_RESULT) - .toList(); - } - - @Override - public Iterable<MetricResult<GaugeResult>> gauges() { - return - FluentIterable - .from(SparkMetricsContainer.getGauges()) - .filter(matchesFilter(filter)) - .transform(TO_GAUGE_RESULT) - .toList(); - } - - private Predicate<MetricUpdate<?>> matchesFilter(final MetricsFilter filter) { - return new Predicate<MetricUpdate<?>>() { - @Override - public boolean apply(MetricUpdate<?> metricResult) { - return MetricFiltering.matches(filter, metricResult.getKey()); - } - }; - } - } - - private static final Function<MetricUpdate<DistributionData>, MetricResult<DistributionResult>> - TO_DISTRIBUTION_RESULT = - new Function<MetricUpdate<DistributionData>, MetricResult<DistributionResult>>() { - @Override - public MetricResult<DistributionResult> apply(MetricUpdate<DistributionData> metricResult) { - if (metricResult != null) { - MetricKey key = metricResult.getKey(); - return new SparkMetricResult<>(key.metricName(), key.stepName(), - metricResult.getUpdate().extractResult()); - } else { - return null; - } - } - }; - - private static final Function<MetricUpdate<Long>, MetricResult<Long>> - TO_COUNTER_RESULT = - new Function<MetricUpdate<Long>, MetricResult<Long>>() { - @Override - public MetricResult<Long> apply(MetricUpdate<Long> metricResult) { - if (metricResult != null) { - MetricKey key = metricResult.getKey(); - return new SparkMetricResult<>(key.metricName(), key.stepName(), - metricResult.getUpdate()); - } else { - return null; - } - } - }; - - private static final Function<MetricUpdate<GaugeData>, MetricResult<GaugeResult>> - TO_GAUGE_RESULT = - new Function<MetricUpdate<GaugeData>, MetricResult<GaugeResult>>() { - @Override - public MetricResult<GaugeResult> apply(MetricUpdate<GaugeData> metricResult) { - if (metricResult != null) { - MetricKey key = metricResult.getKey(); - return new SparkMetricResult<>(key.metricName(), key.stepName(), - metricResult.getUpdate().extractResult()); - } else { - return null; - } - } - }; - - private static class SparkMetricResult<T> implements MetricResult<T> { - private final MetricName name; - private final String step; - private final T result; - - SparkMetricResult(MetricName name, String step, T result) { - this.name = name; - this.step = step; - this.result = result; - } - - @Override - public MetricName name() { - return name; - } - - @Override - public String step() { - return step; - } - - @Override - public T committed() { - throw new UnsupportedOperationException("Spark runner does not currently support committed" - + " metrics results. Please use 'attempted' instead."); - } - - @Override - public T attempted() { - return result; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java deleted file mode 100644 index 9e94c14..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.metrics; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import org.apache.beam.sdk.metrics.DistributionData; -import org.apache.beam.sdk.metrics.GaugeData; -import org.apache.beam.sdk.metrics.MetricKey; -import org.apache.beam.sdk.metrics.MetricUpdates; -import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; -import org.apache.beam.sdk.metrics.MetricsContainer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Spark accumulator value which holds all {@link MetricsContainer}s, aggregates and merges them. - */ -public class SparkMetricsContainer implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(SparkMetricsContainer.class); - - private transient volatile LoadingCache<String, MetricsContainer> metricsContainers; - - private final Map<MetricKey, MetricUpdate<Long>> counters = new HashMap<>(); - private final Map<MetricKey, MetricUpdate<DistributionData>> distributions = new HashMap<>(); - private final Map<MetricKey, MetricUpdate<GaugeData>> gauges = new HashMap<>(); - - public MetricsContainer getContainer(String stepName) { - if (metricsContainers == null) { - synchronized (this) { - if (metricsContainers == null) { - initializeMetricsContainers(); - } - } - } - try { - return metricsContainers.get(stepName); - } catch (ExecutionException e) { - LOG.error("Error while creating metrics container", e); - return null; - } - } - - static Collection<MetricUpdate<Long>> getCounters() { - SparkMetricsContainer sparkMetricsContainer = getInstance(); - sparkMetricsContainer.materialize(); - return sparkMetricsContainer.counters.values(); - } - - static Collection<MetricUpdate<DistributionData>> getDistributions() { - SparkMetricsContainer sparkMetricsContainer = getInstance(); - sparkMetricsContainer.materialize(); - return sparkMetricsContainer.distributions.values(); - } - - static Collection<MetricUpdate<GaugeData>> getGauges() { - return getInstance().gauges.values(); - } - - public SparkMetricsContainer update(SparkMetricsContainer other) { - other.materialize(); - this.updateCounters(other.counters.values()); - this.updateDistributions(other.distributions.values()); - this.updateGauges(other.gauges.values()); - return this; - } - - private static SparkMetricsContainer getInstance() { - return MetricsAccumulator.getInstance().value(); - } - - private void writeObject(ObjectOutputStream out) throws IOException { - // Since MetricsContainer instances are not serializable, materialize a serializable map of - // MetricsAggregators relating to the same metrics. This is done here, when Spark serializes - // the SparkMetricsContainer accumulator before sending results back to the driver at a point in - // time where all the metrics updates have already been made to the MetricsContainers. - materialize(); - out.defaultWriteObject(); - } - - /** - * Materialize metrics. Must be called to enable this instance's data to be serialized correctly. - * This method is idempotent. - */ - public void materialize() { - // Nullifying metricsContainers makes this method idempotent. - if (metricsContainers != null) { - for (MetricsContainer container : metricsContainers.asMap().values()) { - MetricUpdates cumulative = container.getCumulative(); - this.updateCounters(cumulative.counterUpdates()); - this.updateDistributions(cumulative.distributionUpdates()); - this.updateGauges(cumulative.gaugeUpdates()); - } - metricsContainers = null; - } - } - - private void updateCounters(Iterable<MetricUpdate<Long>> updates) { - for (MetricUpdate<Long> update : updates) { - MetricKey key = update.getKey(); - MetricUpdate<Long> current = counters.get(key); - counters.put(key, current != null - ? MetricUpdate.create(key, current.getUpdate() + update.getUpdate()) : update); - } - } - - private void updateDistributions(Iterable<MetricUpdate<DistributionData>> updates) { - for (MetricUpdate<DistributionData> update : updates) { - MetricKey key = update.getKey(); - MetricUpdate<DistributionData> current = distributions.get(key); - distributions.put(key, current != null - ? MetricUpdate.create(key, current.getUpdate().combine(update.getUpdate())) : update); - } - } - - private void updateGauges(Iterable<MetricUpdate<GaugeData>> updates) { - for (MetricUpdate<GaugeData> update : updates) { - MetricKey key = update.getKey(); - MetricUpdate<GaugeData> current = gauges.get(key); - gauges.put( - key, - current != null - ? MetricUpdate.create(key, current.getUpdate().combine(update.getUpdate())) - : update); - } - } - - private static class MetricsContainerCacheLoader extends CacheLoader<String, MetricsContainer> { - @SuppressWarnings("NullableProblems") - @Override - public MetricsContainer load(String stepName) throws Exception { - return new MetricsContainer(stepName); - } - } - - private void initializeMetricsContainers() { - metricsContainers = CacheBuilder.<String, SparkMetricsContainer>newBuilder() - .build(new MetricsContainerCacheLoader()); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - for (Map.Entry<String, ?> metric : new SparkBeamMetric().renderAll().entrySet()) { - sb.append(metric.getKey()).append(": ").append(metric.getValue()).append(" "); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 9bc8760..37d9635 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -321,12 +321,12 @@ public class SparkGroupAlsoByWindowViaWindowSet { long lateDropped = droppedDueToLateness.getCumulative(); if (lateDropped > 0) { LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped)); - droppedDueToLateness.inc(-droppedDueToLateness.getCumulative()); + droppedDueToLateness.update(-droppedDueToLateness.getCumulative()); } long closedWindowDropped = droppedDueToClosedWindow.getCumulative(); if (closedWindowDropped > 0) { LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped)); - droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative()); + droppedDueToClosedWindow.update(-droppedDueToClosedWindow.getCumulative()); } return scala.collection.JavaConversions.asScalaIterator(outIter); http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index d8d52c4..17a3c73 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -31,12 +31,12 @@ import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.EmptyCheckpointMark; import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -110,8 +110,8 @@ public class StateSpecFunctions { scala.Option<CheckpointMarkT> startCheckpointMark, State<Tuple2<byte[], Instant>> state) { - SparkMetricsContainer sparkMetricsContainer = new SparkMetricsContainer(); - MetricsContainer metricsContainer = sparkMetricsContainer.getContainer(stepName); + MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); + MetricsContainer metricsContainer = metricsContainers.getContainer(stepName); // Add metrics container to the scope of org.apache.beam.sdk.io.Source.Reader methods // since they may report metrics. @@ -214,7 +214,7 @@ public class StateSpecFunctions { lowWatermark, highWatermark, readDurationMillis, - sparkMetricsContainer)); + metricsContainers)); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index d74b253..8349b09 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -21,8 +21,8 @@ package org.apache.beam.runners.spark.translation; import java.io.Closeable; import java.io.IOException; import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -37,12 +37,12 @@ import org.joda.time.Instant; class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { private final DoFnRunner<InputT, OutputT> delegate; private final String stepName; - private final Accumulator<SparkMetricsContainer> metricsAccum; + private final Accumulator<MetricsContainerStepMap> metricsAccum; DoFnRunnerWithMetrics( String stepName, DoFnRunner<InputT, OutputT> delegate, - Accumulator<SparkMetricsContainer> metricsAccum) { + Accumulator<MetricsContainerStepMap> metricsAccum) { this.delegate = delegate; this.stepName = stepName; this.metricsAccum = metricsAccum; http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 9bfd2fa..ecf96b6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -28,9 +28,9 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.spark.aggregators.NamedAggregators; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -52,7 +52,7 @@ public class MultiDoFnFunction<InputT, OutputT> implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> { private final Accumulator<NamedAggregators> aggAccum; - private final Accumulator<SparkMetricsContainer> metricsAccum; + private final Accumulator<MetricsContainerStepMap> metricsAccum; private final String stepName; private final DoFn<InputT, OutputT> doFn; private final SparkRuntimeContext runtimeContext; @@ -71,7 +71,7 @@ public class MultiDoFnFunction<InputT, OutputT> */ public MultiDoFnFunction( Accumulator<NamedAggregators> aggAccum, - Accumulator<SparkMetricsContainer> metricsAccum, + Accumulator<MetricsContainerStepMap> metricsAccum, String stepName, DoFn<InputT, OutputT> doFn, SparkRuntimeContext runtimeContext, http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 8a8e246..acbac32 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -35,13 +35,13 @@ import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.SourceRDD; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.Create; @@ -359,7 +359,7 @@ public final class TransformTranslator { WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance(); - Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance(); + Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance(); JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD.mapPartitionsToPair( new MultiDoFnFunction<>( http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 2c4a747..f736e53 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -39,7 +39,6 @@ import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.io.SparkUnboundedSource; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; -import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet; import org.apache.beam.runners.spark.translation.BoundedDataset; import org.apache.beam.runners.spark.translation.Dataset; @@ -59,6 +58,7 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.DoFn; @@ -395,7 +395,7 @@ public final class StreamingTransformTranslator { JavaRDD<WindowedValue<InputT>> rdd) throws Exception { final Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance(); - final Accumulator<SparkMetricsContainer> metricsAccum = + final Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java index 7ab5ebc..4b8548f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * indirection. */ @Experimental(Kind.METRICS) -public class CounterCell implements MetricCell<Long> { +public class CounterCell implements MetricCell<Counter, Long> { private final DirtyState dirty = new DirtyState(); private final AtomicLong value = new AtomicLong(); @@ -41,13 +41,26 @@ public class CounterCell implements MetricCell<Long> { */ CounterCell() {} - /** Increment the counter by the given amount. */ - private void add(long n) { + /** + * Increment the counter by the given amount. + * @param n value to increment by. Can be negative to decrement. + */ + public void update(long n) { value.addAndGet(n); dirty.afterModification(); } @Override + public void update(Long n) { + throw new UnsupportedOperationException("CounterCell.update(Long n) should not be used" + + " as it performs unnecessary boxing/unboxing. Use CounterCell.update(long n) instead."); + } + + @Override public void update(MetricCell<Counter, Long> other) { + update((long) other.getCumulative()); + } + + @Override public DirtyState getDirty() { return dirty; } @@ -56,12 +69,4 @@ public class CounterCell implements MetricCell<Long> { public Long getCumulative() { return value.get(); } - - public void inc() { - add(1); - } - - public void inc(long n) { - add(n); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java index 6706be8..4e0c15c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.metrics; +import java.io.Serializable; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -41,7 +42,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * completed. */ @Experimental(Kind.METRICS) -class DirtyState { +class DirtyState implements Serializable { private enum State { /** Indicates that there have been changes to the MetricCell since last commit. */ DIRTY, http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java index 0f3f6a4..93a3649 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * of indirection. */ @Experimental(Kind.METRICS) -public class DistributionCell implements MetricCell<DistributionData> { +public class DistributionCell implements MetricCell<Distribution, DistributionData> { private final DirtyState dirty = new DirtyState(); private final AtomicReference<DistributionData> value = @@ -42,16 +42,26 @@ public class DistributionCell implements MetricCell<DistributionData> { */ DistributionCell() {} - /** Increment the counter by the given amount. */ + /** Increment the distribution by the given amount. */ public void update(long n) { + update(DistributionData.singleton(n)); + } + + @Override + public void update(DistributionData data) { DistributionData original; do { original = value.get(); - } while (!value.compareAndSet(original, original.combine(DistributionData.singleton(n)))); + } while (!value.compareAndSet(original, original.combine(data))); dirty.afterModification(); } @Override + public void update(MetricCell<Distribution, DistributionData> other) { + update(other.getCumulative()); + } + + @Override public DirtyState getDirty() { return dirty; } http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java index 6f8e880..0cdd568 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java @@ -29,17 +29,33 @@ import org.apache.beam.sdk.annotations.Experimental; * of indirection. */ @Experimental(Experimental.Kind.METRICS) -public class GaugeCell implements MetricCell<GaugeData> { +public class GaugeCell implements MetricCell<Gauge, GaugeData> { private final DirtyState dirty = new DirtyState(); private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty()); + /** Set the gauge to the given value. */ public void set(long value) { + update(GaugeData.create(value)); + } + + @Override + public void update(GaugeData data) { + GaugeData original; + do { + original = gaugeValue.get(); + } while (!gaugeValue.compareAndSet(original, original.combine(data))); + dirty.afterModification(); + } + + @Override + public void update(MetricCell<Gauge, GaugeData> other) { GaugeData original; do { original = gaugeValue.get(); - } while (!gaugeValue.compareAndSet(original, original.combine(GaugeData.create(value)))); + } while (!gaugeValue.compareAndSet(original, original.combine(other.getCumulative()))); dirty.afterModification(); + update(other.getCumulative()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java index 82e30cb..403cac2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.metrics; +import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -24,10 +25,21 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a * specific metric name in a single context. * + * @param <UserT> The type of the user interface for reporting changes to this cell. * @param <DataT> The type of metric data stored (and extracted) from this cell. */ @Experimental(Kind.METRICS) -public interface MetricCell<DataT> { +public interface MetricCell<UserT extends Metric, DataT> extends Serializable { + + /** + * Update value of this cell. + */ + void update(DataT data); + + /** + * Update value of this cell by merging the value of another cell. + */ + void update(MetricCell<UserT, DataT> other); /** * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes.
