[BEAM-2244] Move details of Metrics to Runners Core
Largeish changes this required were:
- splitting the MetricsContainer into an interface in Java Core with
an implementation in Runners Core
- modifying the various *Cell classes to have a name
- cleaning up dependency cross-fire.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8cd98bd9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8cd98bd9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8cd98bd9
Branch: refs/heads/master
Commit: 8cd98bd968740e6f37e8ec9cf2d273c9f948698b
Parents: 03a7f92
Author: bchambers <[email protected]>
Authored: Tue May 9 15:45:50 2017 -0700
Committer: Dan Halperin <[email protected]>
Committed: Wed May 10 17:12:12 2017 -0700
----------------------------------------------------------------------
.../runners/core/metrics/MetricFiltering.java | 102 ++++
.../beam/runners/core/metrics/MetricKey.java | 43 ++
.../beam/runners/core/metrics/package-info.java | 22 +
.../core/metrics/MetricFilteringTest.java | 148 ++++++
.../apache/beam/runners/core/LateDataUtils.java | 4 +-
.../beam/runners/core/metrics/CounterCell.java | 86 ++++
.../beam/runners/core/metrics/DirtyState.java | 99 ++++
.../runners/core/metrics/DistributionCell.java | 80 +++
.../runners/core/metrics/DistributionData.java | 62 +++
.../beam/runners/core/metrics/GaugeCell.java | 78 +++
.../beam/runners/core/metrics/GaugeData.java | 83 ++++
.../beam/runners/core/metrics/MetricCell.java | 42 ++
.../runners/core/metrics/MetricUpdates.java | 79 +++
.../core/metrics/MetricsContainerImpl.java | 188 +++++++
.../core/metrics/MetricsContainerStepMap.java | 495 +++++++++++++++++++
.../beam/runners/core/metrics/MetricsMap.java | 88 ++++
.../beam/runners/core/metrics/package-info.java | 22 +
.../core/LateDataDroppingDoFnRunnerTest.java | 9 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 34 +-
.../runners/core/StatefulDoFnRunnerTest.java | 10 +-
.../runners/core/metrics/CounterCellTest.java | 56 +++
.../runners/core/metrics/DirtyStateTest.java | 56 +++
.../core/metrics/DistributionCellTest.java | 54 ++
.../runners/core/metrics/GaugeCellTest.java | 51 ++
.../core/metrics/MetricUpdateMatchers.java | 82 +++
.../core/metrics/MetricsContainerImplTest.java | 130 +++++
.../metrics/MetricsContainerStepMapTest.java | 272 ++++++++++
.../runners/core/metrics/MetricsMapTest.java | 103 ++++
.../beam/runners/direct/DirectMetrics.java | 14 +-
.../runners/direct/StepTransformResult.java | 2 +-
.../beam/runners/direct/TransformExecutor.java | 10 +-
.../beam/runners/direct/TransformResult.java | 2 +-
.../beam/runners/direct/DirectMetricsTest.java | 14 +-
.../beam/runners/flink/FlinkRunnerResult.java | 4 +-
.../metrics/DoFnRunnerWithMetricsUpdate.java | 3 +-
.../flink/metrics/FlinkMetricContainer.java | 7 +-
.../flink/metrics/MetricsAccumulator.java | 2 +-
.../flink/metrics/ReaderInvocationUtil.java | 5 +-
runners/google-cloud-dataflow-java/pom.xml | 2 +-
.../beam/runners/dataflow/DataflowMetrics.java | 4 +-
.../runners/dataflow/DataflowMetricsTest.java | 4 +-
.../beam/runners/spark/SparkPipelineResult.java | 2 +-
.../apache/beam/runners/spark/io/SourceRDD.java | 2 +-
.../runners/spark/io/SparkUnboundedSource.java | 2 +-
.../spark/metrics/MetricsAccumulator.java | 2 +-
.../spark/metrics/MetricsAccumulatorParam.java | 2 +-
.../runners/spark/metrics/SparkBeamMetric.java | 4 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 10 +-
.../spark/stateful/StateSpecFunctions.java | 2 +-
.../translation/DoFnRunnerWithMetrics.java | 5 +-
.../spark/translation/MultiDoFnFunction.java | 2 +-
.../spark/translation/TransformTranslator.java | 2 +-
.../streaming/StreamingTransformTranslator.java | 2 +-
.../spark/io/ReaderToIteratorAdapterTest.java | 4 +-
.../ResumeFromCheckpointStreamingTest.java | 2 +-
.../streaming/StreamingSourceMetricsTest.java | 2 +-
.../apache/beam/sdk/metrics/CounterCell.java | 72 ---
.../org/apache/beam/sdk/metrics/DirtyState.java | 99 ----
.../beam/sdk/metrics/DistributionCell.java | 74 ---
.../beam/sdk/metrics/DistributionData.java | 60 ---
.../beam/sdk/metrics/DistributionResult.java | 3 +
.../org/apache/beam/sdk/metrics/GaugeCell.java | 70 ---
.../org/apache/beam/sdk/metrics/GaugeData.java | 81 ---
.../apache/beam/sdk/metrics/GaugeResult.java | 3 +
.../org/apache/beam/sdk/metrics/Metric.java | 7 +-
.../org/apache/beam/sdk/metrics/MetricCell.java | 53 --
.../beam/sdk/metrics/MetricFiltering.java | 99 ----
.../org/apache/beam/sdk/metrics/MetricKey.java | 41 --
.../apache/beam/sdk/metrics/MetricUpdates.java | 78 ---
.../org/apache/beam/sdk/metrics/Metrics.java | 8 +-
.../beam/sdk/metrics/MetricsContainer.java | 146 +-----
.../sdk/metrics/MetricsContainerStepMap.java | 487 ------------------
.../beam/sdk/metrics/MetricsEnvironment.java | 9 +-
.../org/apache/beam/sdk/metrics/MetricsMap.java | 87 ----
.../apache/beam/sdk/metrics/SinkMetrics.java | 4 +
.../apache/beam/sdk/metrics/SourceMetrics.java | 3 +
.../beam/sdk/metrics/CounterCellTest.java | 55 ---
.../apache/beam/sdk/metrics/DirtyStateTest.java | 56 ---
.../beam/sdk/metrics/DistributionCellTest.java | 53 --
.../apache/beam/sdk/metrics/GaugeCellTest.java | 49 --
.../beam/sdk/metrics/MetricFilteringTest.java | 145 ------
.../apache/beam/sdk/metrics/MetricMatchers.java | 242 ---------
.../beam/sdk/metrics/MetricResultsMatchers.java | 190 +++++++
.../metrics/MetricsContainerStepMapTest.java | 258 ----------
.../beam/sdk/metrics/MetricsContainerTest.java | 129 -----
.../sdk/metrics/MetricsEnvironmentTest.java | 23 +-
.../apache/beam/sdk/metrics/MetricsMapTest.java | 103 ----
.../apache/beam/sdk/metrics/MetricsTest.java | 43 +-
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 54 +-
89 files changed, 2902 insertions(+), 2678 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java
----------------------------------------------------------------------
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java
new file mode 100644
index 0000000..d469d20
--- /dev/null
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.common.base.Objects;
+import java.util.Set;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Implements matching for metrics filters. Specifically, matching for metric
name,
+ * namespace, and step name.
+ */
+public class MetricFiltering {
+
+ private MetricFiltering() { }
+
+ /** Matching logic is implemented here rather than in MetricsFilter because
we would like
+ * MetricsFilter to act as a "dumb" value-object, with the possibility of
replacing it with
+ * a Proto/JSON/etc. schema object.
+ * @param filter {@link MetricsFilter} with the matching information of an
actual metric
+ * @param key {@link MetricKey} with the information of a metric
+ * @return whether the filter matches the key or not
+ */
+ public static boolean matches(MetricsFilter filter, MetricKey key) {
+ return filter == null
+ || (matchesName(key.metricName(), filter.names())
+ && matchesScope(key.stepName(), filter.steps()));
+ }
+
+ /**
+ * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
+ * represents a path within {@code haystack}. For example, "foo/bar" is in
"a/foo/bar/b",
+ * but not "a/fool/bar/b" or "a/foo/bart/b".
+ */
+ public static boolean subPathMatches(String haystack, String needle) {
+ int location = haystack.indexOf(needle);
+ int end = location + needle.length();
+ if (location == -1) {
+ return false; // needle not found
+ } else if (location != 0 && haystack.charAt(location - 1) != '/') {
+ return false; // the first entry in needle wasn't exactly matched
+ } else if (end != haystack.length() && haystack.charAt(end) != '/') {
+ return false; // the last entry in needle wasn't exactly matched
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * {@code matchesScope(actualScope, scopes)} returns true if the scope of a
metric is matched
+ * by any of the filters in {@code scopes}. A metric scope is a path of type
"A/B/D". A
+ * path is matched by a filter if the filter is equal to the path (e.g.
"A/B/D", or
+ * if it represents a subpath within it (e.g. "A/B" or "B/D", but not
"A/D"). */
+ public static boolean matchesScope(String actualScope, Set<String> scopes) {
+ if (scopes.isEmpty() || scopes.contains(actualScope)) {
+ return true;
+ }
+
+ // If there is no perfect match, a stage name-level match is tried.
+ // This is done by a substring search over the levels of the scope.
+ // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
+ for (String scope : scopes) {
+ if (subPathMatches(actualScope, scope)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static boolean matchesName(MetricName metricName,
Set<MetricNameFilter> nameFilters) {
+ if (nameFilters.isEmpty()) {
+ return true;
+ }
+ for (MetricNameFilter nameFilter : nameFilters) {
+ if ((nameFilter.getName() == null ||
nameFilter.getName().equals(metricName.name()))
+ && Objects.equal(metricName.namespace(), nameFilter.getNamespace()))
{
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java
----------------------------------------------------------------------
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java
new file mode 100644
index 0000000..58d4055
--- /dev/null
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.MetricName;
+
+/**
+ * Metrics are keyed by the step name they are associated with and the name of
the metric.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricKey implements Serializable {
+
+ /** The step name that is associated with this metric. */
+ public abstract String stepName();
+
+ /** The name of the metric. */
+ public abstract MetricName metricName();
+
+ public static MetricKey create(String stepName, MetricName metricName) {
+ return new AutoValue_MetricKey(stepName, metricName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
----------------------------------------------------------------------
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
new file mode 100644
index 0000000..263a705
--- /dev/null
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities for runners to implement metrics.
+ */
+package org.apache.beam.runners.core.metrics;
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java
----------------------------------------------------------------------
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java
new file mode 100644
index 0000000..8953f21
--- /dev/null
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricFiltering}.
+ */
+@RunWith(JUnit4.class)
+public class MetricFilteringTest {
+ private static final MetricName NAME1 = MetricName.named("ns1", "name1");
+
+
+ private boolean matchesSubPath(String actualScope, String subPath) {
+ return MetricFiltering.subPathMatches(actualScope, subPath);
+ }
+
+ @Test
+ public void testMatchCompositeStepNameFilters() {
+ // MetricsFilter with a Class-namespace + name filter + step filter.
+ // Successful match.
+ assertTrue(MetricFiltering.matches(
+ MetricsFilter.builder().addNameFilter(
+ MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+ .addStep("myStep").build(),
+ MetricKey.create(
+ "myBigStep/myStep", MetricName.named(MetricFilteringTest.class,
"myMetricName"))));
+
+ // Unsuccessful match.
+ assertFalse(MetricFiltering.matches(
+ MetricsFilter.builder().addNameFilter(
+ MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+ .addStep("myOtherStep").build(),
+ MetricKey.create(
+ "myOtherStepNoMatch/myStep",
+ MetricName.named(MetricFilteringTest.class, "myMetricName"))));
+ }
+
+ @Test
+ public void testMatchStepNameFilters() {
+ // MetricsFilter with a Class-namespace + name filter + step filter.
+ // Successful match.
+ assertTrue(MetricFiltering.matches(
+ MetricsFilter.builder().addNameFilter(
+ MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+ .addStep("myStep").build(),
+ MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class,
"myMetricName"))));
+
+ // Unsuccessful match.
+ assertFalse(MetricFiltering.matches(
+ MetricsFilter.builder().addNameFilter(
+ MetricNameFilter.named(MetricFilteringTest.class, "myMetricName"))
+ .addStep("myOtherStep").build(),
+ MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class,
"myMetricName"))));
+ }
+
+ @Test
+ public void testMatchClassNamespaceFilters() {
+ // MetricsFilter with a Class-namespace + name filter. Without step filter.
+ // Successful match.
+ assertTrue(MetricFiltering.matches(
+ MetricsFilter.builder().addNameFilter(
+ MetricNameFilter.named(MetricFilteringTest.class,
"myMetricName")).build(),
+ MetricKey.create("anyStep",
MetricName.named(MetricFilteringTest.class, "myMetricName"))));
+
+ // Unsuccessful match.
+ assertFalse(MetricFiltering.matches(
+ MetricsFilter.builder().addNameFilter(
+ MetricNameFilter.named(MetricFilteringTest.class,
"myMetricName")).build(),
+ MetricKey.create("anyStep", MetricName.named(MetricFiltering.class,
"myMetricName"))));
+ }
+
+ @Test
+ public void testMatchStringNamespaceFilters() {
+ // MetricsFilter with a String-namespace + name filter. Without step
filter.
+ // Successful match.
+ assertTrue(
+ MetricFiltering.matches(
+ MetricsFilter.builder().addNameFilter(
+ MetricNameFilter.named("myNamespace", "myMetricName")).build(),
+ MetricKey.create("anyStep", MetricName.named("myNamespace",
"myMetricName"))));
+
+ // Unsuccessful match.
+ assertFalse(
+ MetricFiltering.matches(
+ MetricsFilter.builder().addNameFilter(
+ MetricNameFilter.named("myOtherNamespace",
"myMetricName")).build(),
+ MetricKey.create("anyStep", MetricName.named("myNamespace",
"myMetricname"))));
+ }
+
+ @Test
+ public void testMatchesSubPath() {
+ assertTrue("Match of the first element",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
+ assertTrue("Match of the first elements",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+ assertTrue("Match of the last elements",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
+ assertFalse("Substring match but no subpath match",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
+ assertFalse("Substring match from start - but no subpath match",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
+ }
+
+ private boolean matchesScopeWithSingleFilter(String actualScope, String
filter) {
+ Set<String> scopeFilter = new HashSet<String>();
+ scopeFilter.add(filter);
+ return MetricFiltering.matchesScope(actualScope, scopeFilter);
+ }
+
+ @Test
+ public void testMatchesScope() {
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1",
"Top1"));
+ assertTrue(matchesScopeWithSingleFilter(
+ "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1",
"Top1/Outer1"));
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1",
"Top1/Outer1/Inner1"));
+ assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1",
"Top1/Inner1"));
+ assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1",
"Top1/Outer1/Inn"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index f7c0d31..732e60c 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -22,7 +22,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.metrics.CounterCell;
+import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
@@ -71,7 +71,7 @@ public class LateDataUtils {
.isBefore(timerInternals.currentInputWatermarkTime());
if (expired) {
// The element is too late for this window.
- droppedDueToLateness.update(1L);
+ droppedDueToLateness.inc();
WindowTracing.debug(
"GroupAlsoByWindow: Dropping element at {} for key: {}; "
+ "window: {} since it is too far behind
inputWatermark: {}",
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
new file mode 100644
index 0000000..4378bb9
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricName;
+
+/**
+ * Tracks the current value (and delta) for a Counter metric for a specific
context and bundle.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is
within a runner where
+ * a counter is being reported for a specific step (rather than the counter in
the current context).
+ * In that case retrieving the underlying cell and reporting directly to it
avoids a step of
+ * indirection.
+ */
+@Experimental(Kind.METRICS)
+public class CounterCell implements Counter, MetricCell<Long> {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicLong value = new AtomicLong();
+ private final MetricName name;
+
+ /**
+ * Package-visibility because all {@link CounterCell CounterCells} should be
created by
+ * {@link MetricsContainerImpl#getCounter(MetricName)}.
+ */
+ CounterCell(MetricName name) {
+ this.name = name;
+ }
+
+ /**
+ * Increment the counter by the given amount.
+ * @param n value to increment by. Can be negative to decrement.
+ */
+ @Override
+ public void inc(long n) {
+ value.addAndGet(n);
+ dirty.afterModification();
+ }
+
+ public void inc() {
+ inc(1);
+ }
+
+ public void dec() {
+ inc(-1);
+ }
+
+ public void dec(long n) {
+ inc(-1 * n);
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public Long getCumulative() {
+ return value.get();
+ }
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
new file mode 100644
index 0000000..532fc2a
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Atomically tracks the dirty-state of a metric.
+ *
+ * <p>Reporting an update is split into two parts such that only changes made
before the call to
+ * {@link #beforeCommit()} are committed when {@link #afterCommit()} is
invoked. This allows for
+ * a two-step commit process of gathering all the dirty updates (calling
{#link beforeCommit()})
+ * followed by committing and calling {#link afterCommit()}.
+ *
+ * <p>The tracking of dirty states is done conservatively -- sometimes {@link
#beforeCommit()}
+ * will return true (indicating a dirty metric) even if there have been no
changes since the last
+ * commit.
+ *
+ * <p>There is also a possible race when the underlying metric is modified but
the call to
+ * {@link #afterModification()} hasn't happened before the call to {@link
#beforeCommit()}. In this
+ * case the next round of metric updating will see the changes. If this was
for the final commit,
+ * then the metric updates shouldn't be extracted until all possible user
modifications have
+ * completed.
+ */
+@Experimental(Kind.METRICS)
+class DirtyState implements Serializable {
+ private enum State {
+ /** Indicates that there have been changes to the MetricCell since last
commit. */
+ DIRTY,
+ /** Indicates that there have been no changes to the MetricCell since last
commit. */
+ CLEAN,
+ /** Indicates that a commit of the current value is in progress. */
+ COMMITTING
+ }
+
+ private final AtomicReference<State> dirty = new
AtomicReference<>(State.DIRTY);
+
+ /**
+ * Indicate that changes have been made to the metric being tracked by this
{@link DirtyState}.
+ *
+ * <p>Should be called <b>after</b> modification of the value.
+ */
+ public void afterModification() {
+ dirty.set(State.DIRTY);
+ }
+
+ /**
+ * Check the dirty state and mark the metric as committing.
+ *
+ * <p>If the state was {@code CLEAN}, this returns {@code false}. If the
state was {@code DIRTY}
+ * or {@code COMMITTING} this returns {@code true} and sets the state to
{@code COMMITTING}.
+ *
+ * @return {@code false} if the state is clean and {@code true} otherwise.
+ */
+ public boolean beforeCommit() {
+ // After this loop, we want the state to be either CLEAN or COMMITTING.
+ // If the state was CLEAN, we don't need to do anything (and exit the loop
early)
+ // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING).
This will only
+ // fail if another thread is getting updates which generally shouldn't be
the case.
+ // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING,
COMMITTING). This will
+ // fail if another thread commits updates (which shouldn't be the case) or
if the user code
+ // updates the metric, in which case it will transition to DIRTY and the
next iteration will
+ // successfully update it.
+ State state;
+ do {
+ state = dirty.get();
+ } while (state != State.CLEAN && !dirty.compareAndSet(state,
State.COMMITTING));
+
+ return state != State.CLEAN;
+ }
+
+ /**
+ * Mark any changes up to the most recently call to {@link #beforeCommit()}}
as committed.
+ * The next call to {@link #beforeCommit()} will return {@code false} unless
there have
+ * been changes made since the previous call to {@link #beforeCommit()}.
+ */
+ public void afterCommit() {
+ dirty.compareAndSet(State.COMMITTING, State.CLEAN);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
new file mode 100644
index 0000000..5a5099a
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.MetricName;
+
+/**
+ * Tracks the current value (and delta) for a Distribution metric.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is
within a runner where
+ * a distribution is being reported for a specific step (rather than the
distribution in the current
+ * context). In that case retrieving the underlying cell and reporting
directly to it avoids a step
+ * of indirection.
+ */
+@Experimental(Kind.METRICS)
+public class DistributionCell implements Distribution,
MetricCell<DistributionData> {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicReference<DistributionData> value =
+ new AtomicReference<>(DistributionData.EMPTY);
+ private final MetricName name;
+
+ /**
+ * Package-visibility because all {@link DistributionCell DistributionCells}
should be created by
+ * {@link MetricsContainerImpl#getDistribution(MetricName)}.
+ */
+ DistributionCell(MetricName name) {
+ this.name = name;
+ }
+
+ /** Increment the distribution by the given amount. */
+ @Override
+ public void update(long n) {
+ update(DistributionData.singleton(n));
+ }
+
+ void update(DistributionData data) {
+ DistributionData original;
+ do {
+ original = value.get();
+ } while (!value.compareAndSet(original, original.combine(data)));
+ dirty.afterModification();
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public DistributionData getCumulative() {
+ return value.get();
+ }
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java
new file mode 100644
index 0000000..099e63f
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.metrics.DistributionResult;
+
+/**
+ * Data describing the the distribution. This should retain enough detail that
it can be combined
+ * with other {@link DistributionData}.
+ *
+ * <p>This is kept distinct from {@link DistributionResult} since this may be
extended to include
+ * data necessary to approximate quantiles, etc. while {@link
DistributionResult} would just include
+ * the approximate value of those quantiles.
+ */
+@AutoValue
+public abstract class DistributionData implements Serializable {
+
+ public abstract long sum();
+ public abstract long count();
+ public abstract long min();
+ public abstract long max();
+
+ public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE,
Long.MIN_VALUE);
+
+ public static DistributionData create(long sum, long count, long min, long
max) {
+ return new AutoValue_DistributionData(sum, count, min, max);
+ }
+
+ public static DistributionData singleton(long value) {
+ return create(value, 1, value, value);
+ }
+
+ public DistributionData combine(DistributionData value) {
+ return create(
+ sum() + value.sum(),
+ count() + value.count(),
+ Math.min(value.min(), min()),
+ Math.max(value.max(), max()));
+ }
+
+ public DistributionResult extractResult() {
+ return DistributionResult.create(sum(), count(), min(), max());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java
new file mode 100644
index 0000000..795e826
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.MetricName;
+
+/**
+ * Tracks the current value (and delta) for a {@link Gauge} metric.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is
within a runner where
+ * a gauge is being reported for a specific step (rather than the gauge in the
current
+ * context). In that case retrieving the underlying cell and reporting
directly to it avoids a step
+ * of indirection.
+ */
+@Experimental(Experimental.Kind.METRICS)
+public class GaugeCell implements Gauge, MetricCell<GaugeData> {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicReference<GaugeData> gaugeValue = new
AtomicReference<>(GaugeData.empty());
+ private final MetricName name;
+
+ /**
+ * Package-visibility because all {@link GaugeCell GaugeCells} should be
created by
+ * {@link MetricsContainerImpl#getGauge(MetricName)}.
+ */
+ GaugeCell(MetricName name) {
+ this.name = name;
+ }
+
+ /** Set the gauge to the given value. */
+ @Override
+ public void set(long value) {
+ update(GaugeData.create(value));
+ }
+
+ void update(GaugeData data) {
+ GaugeData original;
+ do {
+ original = gaugeValue.get();
+ } while (!gaugeValue.compareAndSet(original, original.combine(data)));
+ dirty.afterModification();
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public GaugeData getCumulative() {
+ return gaugeValue.get();
+ }
+
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeData.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeData.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeData.java
new file mode 100644
index 0000000..15a353f
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeData.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.joda.time.Instant;
+
+/**
+ * Data describing the gauge. This should retain enough detail that it can be
combined with
+ * other {@link GaugeData}.
+ */
+@AutoValue
+public abstract class GaugeData implements Serializable {
+
+ public abstract long value();
+
+ public abstract Instant timestamp();
+
+ public static GaugeData create(long value) {
+ return new AutoValue_GaugeData(value, Instant.now());
+ }
+
+ public static GaugeData empty() {
+ return EmptyGaugeData.INSTANCE;
+ }
+
+ public GaugeData combine(GaugeData other) {
+ if (this.timestamp().isAfter(other.timestamp())) {
+ return this;
+ } else {
+ return other;
+ }
+ }
+
+ public GaugeResult extractResult() {
+ return GaugeResult.create(value(), timestamp());
+ }
+
+ /**
+ * Empty {@link GaugeData}, representing no values reported.
+ */
+ public static class EmptyGaugeData extends GaugeData {
+
+ private static final EmptyGaugeData INSTANCE = new EmptyGaugeData();
+ private static final Instant EPOCH = new Instant(0);
+
+ private EmptyGaugeData() {
+ }
+
+ @Override
+ public long value() {
+ return -1L;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return EPOCH;
+ }
+
+ @Override
+ public GaugeResult extractResult() {
+ return GaugeResult.empty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
new file mode 100644
index 0000000..2b773f0
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricCell.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A {@link MetricCell} is used for accumulating in-memory changes to a
metric. It represents a
+ * specific metric name in a single context.
+ *
+ * @param <DataT> The type of metric data stored (and extracted) from this
cell.
+ */
+@Experimental(Kind.METRICS)
+public interface MetricCell<DataT> extends Serializable {
+ /**
+ * Return the {@link DirtyState} tracking whether this metric cell contains
uncommitted changes.
+ */
+ DirtyState getDirty();
+
+ /**
+ * Return the cumulative value of this metric.
+ */
+ DataT getCumulative();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
new file mode 100644
index 0000000..eae3305
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Representation of multiple metric updates.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricUpdates {
+
+ public static final MetricUpdates EMPTY = MetricUpdates.create(
+ Collections.<MetricUpdate<Long>>emptyList(),
+ Collections.<MetricUpdate<DistributionData>>emptyList(),
+ Collections.<MetricUpdate<GaugeData>>emptyList());
+
+ /**
+ * Representation of a single metric update.
+ * @param <T> The type of value representing the update.
+ */
+ @AutoValue
+ public abstract static class MetricUpdate<T> implements Serializable {
+
+ /** The key being updated. */
+ public abstract MetricKey getKey();
+ /** The value of the update. */
+ public abstract T getUpdate();
+
+ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
+ return new AutoValue_MetricUpdates_MetricUpdate(key, update);
+ }
+ }
+
+ /** Returns true if there are no updates in this MetricUpdates object. */
+ public boolean isEmpty() {
+ return Iterables.isEmpty(counterUpdates())
+ && Iterables.isEmpty(distributionUpdates());
+ }
+
+ /** All of the counter updates. */
+ public abstract Iterable<MetricUpdate<Long>> counterUpdates();
+
+ /** All of the distribution updates. */
+ public abstract Iterable<MetricUpdate<DistributionData>>
distributionUpdates();
+
+ /** All of the gauges updates. */
+ public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates();
+
+ /** Create a new {@link MetricUpdates} bundle. */
+ public static MetricUpdates create(
+ Iterable<MetricUpdate<Long>> counterUpdates,
+ Iterable<MetricUpdate<DistributionData>> distributionUpdates,
+ Iterable<MetricUpdate<GaugeData>> gaugeUpdates) {
+ return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates,
gaugeUpdates);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
new file mode 100644
index 0000000..6967bf0
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+
+/**
+ * Holds the metrics for a single step and unit-of-commit (bundle).
+ *
+ * <p>This class is thread-safe. It is intended to be used with 1 (or more)
threads are updating
+ * metrics and at-most 1 thread is extracting updates by calling {@link
#getUpdates} and
+ * {@link #commitUpdates}. Outside of this it is still safe. Although races in
the update extraction
+ * may cause updates that don't actually have any changes, it will never lose
an update.
+ *
+ * <p>For consistency, all threads that update metrics should finish before
getting the final
+ * cumulative values/updates.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsContainerImpl implements Serializable, MetricsContainer {
+
+ private final String stepName;
+
+ private MetricsMap<MetricName, CounterCell> counters =
+ new MetricsMap<>(new MetricsMap.Factory<MetricName, CounterCell>() {
+ @Override
+ public CounterCell createInstance(MetricName name) {
+ return new CounterCell(name);
+ }
+ });
+
+ private MetricsMap<MetricName, DistributionCell> distributions =
+ new MetricsMap<>(new MetricsMap.Factory<MetricName, DistributionCell>() {
+ @Override
+ public DistributionCell createInstance(MetricName name) {
+ return new DistributionCell(name);
+ }
+ });
+
+ private MetricsMap<MetricName, GaugeCell> gauges =
+ new MetricsMap<>(new MetricsMap.Factory<MetricName, GaugeCell>() {
+ @Override
+ public GaugeCell createInstance(MetricName name) {
+ return new GaugeCell(name);
+ }
+ });
+
+ /**
+ * Create a new {@link MetricsContainerImpl} associated with the given
{@code stepName}.
+ */
+ public MetricsContainerImpl(String stepName) {
+ this.stepName = stepName;
+ }
+
+ @Override
+ public CounterCell getCounter(MetricName metricName) {
+ return counters.get(metricName);
+ }
+
+ @Override
+ public DistributionCell getDistribution(MetricName metricName) {
+ return distributions.get(metricName);
+ }
+
+ @Override
+ public GaugeCell getGauge(MetricName metricName) {
+ return gauges.get(metricName);
+ }
+
+ private <UpdateT, CellT extends MetricCell<UpdateT>>
+ ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName,
CellT> cells) {
+ ImmutableList.Builder<MetricUpdate<UpdateT>> updates =
ImmutableList.builder();
+ for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+ if (cell.getValue().getDirty().beforeCommit()) {
+ updates.add(MetricUpdate.create(MetricKey.create(stepName,
cell.getKey()),
+ cell.getValue().getCumulative()));
+ }
+ }
+ return updates.build();
+ }
+
+ /**
+ * Return the cumulative values for any metrics that have changed since the
last time updates were
+ * committed.
+ */
+ public MetricUpdates getUpdates() {
+ return MetricUpdates.create(
+ extractUpdates(counters),
+ extractUpdates(distributions),
+ extractUpdates(gauges));
+ }
+
+ private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>>
cells) {
+ for (MetricCell<?> cell : cells.values()) {
+ cell.getDirty().afterCommit();
+ }
+ }
+
+ /**
+ * Mark all of the updates that were retrieved with the latest call to
{@link #getUpdates()} as
+ * committed.
+ */
+ public void commitUpdates() {
+ commitUpdates(counters);
+ commitUpdates(distributions);
+ commitUpdates(gauges);
+ }
+
+ private <UserT extends Metric, UpdateT, CellT extends MetricCell<UpdateT>>
+ ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
+ MetricsMap<MetricName, CellT> cells) {
+ ImmutableList.Builder<MetricUpdate<UpdateT>> updates =
ImmutableList.builder();
+ for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+ UpdateT update = checkNotNull(cell.getValue().getCumulative());
+ updates.add(MetricUpdate.create(MetricKey.create(stepName,
cell.getKey()), update));
+ }
+ return updates.build();
+ }
+
+ /**
+ * Return the {@link MetricUpdates} representing the cumulative values of
all metrics in this
+ * container.
+ */
+ public MetricUpdates getCumulative() {
+ return MetricUpdates.create(
+ extractCumulatives(counters),
+ extractCumulatives(distributions),
+ extractCumulatives(gauges));
+ }
+
+ /**
+ * Update values of this {@link MetricsContainerImpl} by merging the value
of another cell.
+ */
+ public void update(MetricsContainerImpl other) {
+ updateCounters(counters, other.counters);
+ updateDistributions(distributions, other.distributions);
+ updateGauges(gauges, other.gauges);
+ }
+
+ private void updateCounters(
+ MetricsMap<MetricName, CounterCell> current,
+ MetricsMap<MetricName, CounterCell> updates) {
+ for (Map.Entry<MetricName, CounterCell> counter : updates.entries()) {
+ current.get(counter.getKey()).inc(counter.getValue().getCumulative());
+ }
+ }
+
+ private void updateDistributions(
+ MetricsMap<MetricName, DistributionCell> current,
+ MetricsMap<MetricName, DistributionCell> updates) {
+ for (Map.Entry<MetricName, DistributionCell> counter : updates.entries()) {
+ current.get(counter.getKey()).update(counter.getValue().getCumulative());
+ }
+ }
+
+ private void updateGauges(
+ MetricsMap<MetricName, GaugeCell> current,
+ MetricsMap<MetricName, GaugeCell> updates) {
+ for (Map.Entry<MetricName, GaugeCell> counter : updates.entries()) {
+ current.get(counter.getKey()).update(counter.getValue().getCumulative());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
new file mode 100644
index 0000000..68238e4
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Metrics containers by step.
+ *
+ * <p>This class is not thread-safe.</p>
+ */
+public class MetricsContainerStepMap implements Serializable {
+ private Map<String, MetricsContainerImpl> metricsContainers;
+
+ public MetricsContainerStepMap() {
+ this.metricsContainers = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Returns the container for the given step name.
+ */
+ public MetricsContainerImpl getContainer(String stepName) {
+ if (!metricsContainers.containsKey(stepName)) {
+ metricsContainers.put(stepName, new MetricsContainerImpl(stepName));
+ }
+ return metricsContainers.get(stepName);
+ }
+
+ /**
+ * Update this {@link MetricsContainerStepMap} with all values from given
+ * {@link MetricsContainerStepMap}.
+ */
+ public void updateAll(MetricsContainerStepMap other) {
+ for (Map.Entry<String, MetricsContainerImpl> container :
other.metricsContainers.entrySet()) {
+ getContainer(container.getKey()).update(container.getValue());
+ }
+ }
+
+ /**
+ * Update {@link MetricsContainerImpl} for given step in this map with all
values from given
+ * {@link MetricsContainerImpl}.
+ */
+ public void update(String step, MetricsContainerImpl container) {
+ getContainer(step).update(container);
+ }
+
+ /**
+ * Returns {@link MetricResults} based on given
+ * {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and
committed metrics.
+ *
+ * <p>This constructor is intended for runners which support both attempted
and committed
+ * metrics.
+ */
+ public static MetricResults asMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers,
+ MetricsContainerStepMap committedMetricsContainers) {
+ return new MetricsContainerStepMapMetricResults(
+ attemptedMetricsContainers,
+ committedMetricsContainers);
+ }
+
+ /**
+ * Returns {@link MetricResults} based on given {@link
MetricsContainerStepMap} of attempted
+ * metrics.
+ *
+ * <p>This constructor is intended for runners which only support
`attempted` metrics.
+ * Accessing {@link MetricResult#committed()} in the resulting {@link
MetricResults} will result
+ * in an {@link UnsupportedOperationException}.</p>
+ */
+ public static MetricResults asAttemptedOnlyMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers) {
+ return new
MetricsContainerStepMapMetricResults(attemptedMetricsContainers);
+ }
+
+ private Map<String, MetricsContainerImpl> getMetricsContainers() {
+ return metricsContainers;
+ }
+
+ private static class MetricsContainerStepMapMetricResults extends
MetricResults {
+ private final Map<MetricKey, AttemptedAndCommitted<Long>> counters = new
HashMap<>();
+ private final Map<MetricKey, AttemptedAndCommitted<DistributionData>>
distributions =
+ new HashMap<>();
+ private final Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges =
new HashMap<>();
+ private final boolean isCommittedSupported;
+
+ private MetricsContainerStepMapMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers) {
+ this(attemptedMetricsContainers, new MetricsContainerStepMap(), false);
+ }
+
+ private MetricsContainerStepMapMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers,
+ MetricsContainerStepMap committedMetricsContainers) {
+ this(attemptedMetricsContainers, committedMetricsContainers, true);
+ }
+
+ private MetricsContainerStepMapMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers,
+ MetricsContainerStepMap committedMetricsContainers,
+ boolean isCommittedSupported) {
+ for (MetricsContainerImpl container
+ : attemptedMetricsContainers.getMetricsContainers().values()) {
+ MetricUpdates cumulative = container.getCumulative();
+ mergeCounters(counters, cumulative.counterUpdates(),
attemptedCounterUpdateFn());
+ mergeDistributions(distributions, cumulative.distributionUpdates(),
+ attemptedDistributionUpdateFn());
+ mergeGauges(gauges, cumulative.gaugeUpdates(),
attemptedGaugeUpdateFn());
+ }
+ for (MetricsContainerImpl container
+ : committedMetricsContainers.getMetricsContainers().values()) {
+ MetricUpdates cumulative = container.getCumulative();
+ mergeCounters(counters, cumulative.counterUpdates(),
committedCounterUpdateFn());
+ mergeDistributions(distributions, cumulative.distributionUpdates(),
+ committedDistributionUpdateFn());
+ mergeGauges(gauges, cumulative.gaugeUpdates(),
committedGaugeUpdateFn());
+ }
+ this.isCommittedSupported = isCommittedSupported;
+ }
+
+ private Function<MetricUpdate<DistributionData>,
AttemptedAndCommitted<DistributionData>>
+ attemptedDistributionUpdateFn() {
+ return new Function<MetricUpdate<DistributionData>,
+ AttemptedAndCommitted<DistributionData>>() {
+ @Override
+ public AttemptedAndCommitted<DistributionData>
apply(MetricUpdate<DistributionData> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ input,
+ MetricUpdate.create(key, DistributionData.EMPTY));
+ }
+ };
+ }
+
+ private Function<MetricUpdate<DistributionData>,
AttemptedAndCommitted<DistributionData>>
+ committedDistributionUpdateFn() {
+ return new Function<MetricUpdate<DistributionData>,
+ AttemptedAndCommitted<DistributionData>>() {
+ @Override
+ public AttemptedAndCommitted<DistributionData>
apply(MetricUpdate<DistributionData> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(key, DistributionData.EMPTY),
+ input);
+ }
+ };
+ }
+
+ private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+ attemptedGaugeUpdateFn() {
+ return new Function<MetricUpdate<GaugeData>,
AttemptedAndCommitted<GaugeData>>() {
+ @Override
+ public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData>
input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ input,
+ MetricUpdate.create(key, GaugeData.empty()));
+ }
+ };
+ }
+
+ private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+ committedGaugeUpdateFn() {
+ return new Function<MetricUpdate<GaugeData>,
AttemptedAndCommitted<GaugeData>>() {
+ @Override
+ public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData>
input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(key, GaugeData.empty()),
+ input);
+ }
+ };
+ }
+
+ private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>
attemptedCounterUpdateFn() {
+ return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+ @Override
+ public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ input,
+ MetricUpdate.create(key, 0L));
+ }
+ };
+ }
+
+ private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>
committedCounterUpdateFn() {
+ return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+ @Override
+ public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(key, 0L),
+ input);
+ }
+ };
+ }
+
+ @Override
+ public MetricQueryResults queryMetrics(MetricsFilter filter) {
+ return new QueryResults(filter);
+ }
+
+ private class QueryResults implements MetricQueryResults {
+ private final MetricsFilter filter;
+
+ private QueryResults(MetricsFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public Iterable<MetricResult<Long>> counters() {
+ return
+ FluentIterable
+ .from(counters.values())
+ .filter(matchesFilter(filter))
+ .transform(counterUpdateToResult())
+ .toList();
+ }
+
+ @Override
+ public Iterable<MetricResult<DistributionResult>> distributions() {
+ return
+ FluentIterable
+ .from(distributions.values())
+ .filter(matchesFilter(filter))
+ .transform(distributionUpdateToResult())
+ .toList();
+ }
+
+ @Override
+ public Iterable<MetricResult<GaugeResult>> gauges() {
+ return
+ FluentIterable
+ .from(gauges.values())
+ .filter(matchesFilter(filter))
+ .transform(gaugeUpdateToResult())
+ .toList();
+ }
+
+ private Predicate<AttemptedAndCommitted<?>> matchesFilter(final
MetricsFilter filter) {
+ return new Predicate<AttemptedAndCommitted<?>>() {
+ @Override
+ public boolean apply(AttemptedAndCommitted<?> attemptedAndCommitted)
{
+ return MetricFiltering.matches(filter,
attemptedAndCommitted.getKey());
+ }
+ };
+ }
+ }
+
+ private Function<AttemptedAndCommitted<Long>, MetricResult<Long>>
counterUpdateToResult() {
+ return new
+ Function<AttemptedAndCommitted<Long>, MetricResult<Long>>() {
+ @Override
+ public MetricResult<Long>
+ apply(AttemptedAndCommitted<Long> metricResult) {
+ MetricKey key = metricResult.getKey();
+ return new AccumulatedMetricResult<>(
+ key.metricName(),
+ key.stepName(),
+ metricResult.getAttempted().getUpdate(),
+ isCommittedSupported
+ ? metricResult.getCommitted().getUpdate()
+ : null,
+ isCommittedSupported);
+ }
+ };
+ }
+
+ private Function<AttemptedAndCommitted<DistributionData>,
MetricResult<DistributionResult>>
+ distributionUpdateToResult() {
+ return new
+ Function<AttemptedAndCommitted<DistributionData>,
MetricResult<DistributionResult>>() {
+ @Override
+ public MetricResult<DistributionResult>
+ apply(AttemptedAndCommitted<DistributionData> metricResult) {
+ MetricKey key = metricResult.getKey();
+ return new AccumulatedMetricResult<>(
+ key.metricName(),
+ key.stepName(),
+ metricResult.getAttempted().getUpdate().extractResult(),
+ isCommittedSupported
+ ? metricResult.getCommitted().getUpdate().extractResult()
+ : null,
+ isCommittedSupported);
+ }
+ };
+ }
+
+ private Function<AttemptedAndCommitted<GaugeData>,
MetricResult<GaugeResult>>
+ gaugeUpdateToResult() {
+ return new
+ Function<AttemptedAndCommitted<GaugeData>,
MetricResult<GaugeResult>>() {
+ @Override
+ public MetricResult<GaugeResult>
+ apply(AttemptedAndCommitted<GaugeData> metricResult) {
+ MetricKey key = metricResult.getKey();
+ return new AccumulatedMetricResult<>(
+ key.metricName(),
+ key.stepName(),
+ metricResult.getAttempted().getUpdate().extractResult(),
+ isCommittedSupported
+ ? metricResult.getCommitted().getUpdate().extractResult()
+ : null,
+ isCommittedSupported);
+ }
+ };
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private void mergeCounters(
+ Map<MetricKey, AttemptedAndCommitted<Long>> counters,
+ Iterable<MetricUpdate<Long>> updates,
+ Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>
updateToAttemptedAndCommittedFn) {
+ for (MetricUpdate<Long> metricUpdate : updates) {
+ MetricKey key = metricUpdate.getKey();
+ AttemptedAndCommitted<Long> update =
+ updateToAttemptedAndCommittedFn.apply(metricUpdate);
+ if (counters.containsKey(key)) {
+ AttemptedAndCommitted<Long> current = counters.get(key);
+ update = new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(
+ key,
+ update.getAttempted().getUpdate() +
current.getAttempted().getUpdate()),
+ MetricUpdate.create(
+ key,
+ update.getCommitted().getUpdate() +
current.getCommitted().getUpdate()));
+ }
+ counters.put(key, update);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private void mergeDistributions(
+ Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions,
+ Iterable<MetricUpdate<DistributionData>> updates,
+ Function<MetricUpdate<DistributionData>,
AttemptedAndCommitted<DistributionData>>
+ updateToAttemptedAndCommittedFn) {
+ for (MetricUpdate<DistributionData> metricUpdate : updates) {
+ MetricKey key = metricUpdate.getKey();
+ AttemptedAndCommitted<DistributionData> update =
+ updateToAttemptedAndCommittedFn.apply(metricUpdate);
+ if (distributions.containsKey(key)) {
+ AttemptedAndCommitted<DistributionData> current =
distributions.get(key);
+ update = new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(
+ key,
+
update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+ MetricUpdate.create(
+ key,
+
update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+ }
+ distributions.put(key, update);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private void mergeGauges(
+ Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges,
+ Iterable<MetricUpdate<GaugeData>> updates,
+ Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+ updateToAttemptedAndCommittedFn) {
+ for (MetricUpdate<GaugeData> metricUpdate : updates) {
+ MetricKey key = metricUpdate.getKey();
+ AttemptedAndCommitted<GaugeData> update =
+ updateToAttemptedAndCommittedFn.apply(metricUpdate);
+ if (gauges.containsKey(key)) {
+ AttemptedAndCommitted<GaugeData> current = gauges.get(key);
+ update = new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(
+ key,
+
update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+ MetricUpdate.create(
+ key,
+
update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+ }
+ gauges.put(key, update);
+ }
+ }
+
+ /**
+ * Accumulated implementation of {@link MetricResult}.
+ */
+ private static class AccumulatedMetricResult<T> implements MetricResult<T>
{
+ private final MetricName name;
+ private final String step;
+ private final T attempted;
+ private final T committed;
+ private final boolean isCommittedSupported;
+
+ private AccumulatedMetricResult(
+ MetricName name,
+ String step,
+ T attempted,
+ T committed,
+ boolean isCommittedSupported) {
+ this.name = name;
+ this.step = step;
+ this.attempted = attempted;
+ this.committed = committed;
+ this.isCommittedSupported = isCommittedSupported;
+ }
+
+ @Override
+ public MetricName name() {
+ return name;
+ }
+
+ @Override
+ public String step() {
+ return step;
+ }
+
+ @Override
+ public T committed() {
+ if (!isCommittedSupported) {
+ throw new UnsupportedOperationException("This runner does not
currently support committed"
+ + " metrics results. Please use 'attempted' instead.");
+ }
+ return committed;
+ }
+
+ @Override
+ public T attempted() {
+ return attempted;
+ }
+ }
+
+ /**
+ * Attempted and committed {@link MetricUpdate MetricUpdates}.
+ */
+ private static class AttemptedAndCommitted<T> {
+ private final MetricKey key;
+ private final MetricUpdate<T> attempted;
+ private final MetricUpdate<T> committed;
+
+ private AttemptedAndCommitted(MetricKey key, MetricUpdate<T> attempted,
+ MetricUpdate<T> committed) {
+ this.key = key;
+ this.attempted = attempted;
+ this.committed = committed;
+ }
+
+ private MetricKey getKey() {
+ return key;
+ }
+
+ private MetricUpdate<T> getAttempted() {
+ return attempted;
+ }
+
+ private MetricUpdate<T> getCommitted() {
+ return committed;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
new file mode 100644
index 0000000..9f08076
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.metrics;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A map from {@code K} to {@code T} that supports getting or creating values
associated with a key
+ * in a thread-safe manner.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsMap<K, T> implements Serializable {
+
+ /** Interface for creating instances to populate the {@link MetricsMap}. */
+ public interface Factory<K, T> extends Serializable {
+ /**
+ * Create an instance of {@code T} to use with the given {@code key}.
+ *
+ * <p>It must be safe to call this from multiple threads.
+ */
+ T createInstance(K key);
+ }
+
+ private final Factory<K, T> factory;
+ private final ConcurrentMap<K, T> metrics = new ConcurrentHashMap<>();
+
+ public MetricsMap(Factory<K, T> factory) {
+ this.factory = factory;
+ }
+
+ /**
+ * Get or create the value associated with the given key.
+ */
+ public T get(K key) {
+ T metric = metrics.get(key);
+ if (metric == null) {
+ metric = factory.createInstance(key);
+ metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric),
metric);
+ }
+ return metric;
+ }
+
+ /**
+ * Get the value associated with the given key, if it exists.
+ */
+ @Nullable
+ public T tryGet(K key) {
+ return metrics.get(key);
+ }
+
+ /**
+ * Return an iterable over the entries in the current {@link MetricsMap}.
+ */
+ public Iterable<Map.Entry<K, T>> entries() {
+ return Iterables.unmodifiableIterable(metrics.entrySet());
+ }
+
+ /**
+ * Return an iterable over the values in the current {@link MetricsMap}.
+ */
+ public Iterable<T> values() {
+ return Iterables.unmodifiableIterable(metrics.values());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
new file mode 100644
index 0000000..263a705
--- /dev/null
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities for runners to implement metrics.
+ */
+package org.apache.beam.runners.core.metrics;
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 88c6ab6..9ee79d7 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -26,8 +26,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.Arrays;
import org.apache.beam.runners.core.LateDataDroppingDoFnRunner.LateDataFilter;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -58,7 +58,8 @@ public class LateDataDroppingDoFnRunnerTest {
@Test
public void testLateDataFilter() throws Exception {
- MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+ MetricsContainerImpl container = new MetricsContainerImpl("any");
+ MetricsEnvironment.setCurrentContainer(container);
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new
Instant(15L));
LateDataFilter lateDataFilter = new LateDataFilter(
@@ -77,14 +78,14 @@ public class LateDataDroppingDoFnRunnerTest {
createDatum(16, 16L),
createDatum(18, 18L));
assertThat(expected, containsInAnyOrder(Iterables.toArray(actual,
WindowedValue.class)));
- long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+ long droppedValues = container.getCounter(
MetricName.named(LateDataDroppingDoFnRunner.class,
LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS))
.getCumulative().longValue();
assertEquals(1, droppedValues);
// Ensure that reiterating returns the same results and doesn't increment
the counter again.
assertThat(expected, containsInAnyOrder(Iterables.toArray(actual,
WindowedValue.class)));
- droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+ droppedValues = container.getCounter(
MetricName.named(LateDataDroppingDoFnRunner.class,
LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS))
.getCumulative().longValue();