Simplify the API for managing MetricsEnvironment 1. setCurrentContainer returns the previous MetricsEnvironment 2. setCurrentContainer(null) resets the thread local 3. scopedCurrentContainer sets the container and returns a Closeable to reset the previous container.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e6870a6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e6870a6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e6870a6d Branch: refs/heads/gearpump-runner Commit: e6870a6dc10e4ad52a911c316137a9f7731a9194 Parents: 6ec45f7 Author: bchambers <[email protected]> Authored: Tue Nov 22 11:37:23 2016 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Nov 22 12:27:41 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/TransformExecutor.java | 5 +- .../beam/sdk/metrics/MetricsEnvironment.java | 60 +++++++++++++++----- .../sdk/metrics/MetricsEnvironmentTest.java | 8 +-- .../apache/beam/sdk/metrics/MetricsTest.java | 6 +- 4 files changed, 56 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index 1704955..fb31cc9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; @@ -89,8 +90,7 @@ class TransformExecutor<T> implements Runnable { @Override public void run() { MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName()); - MetricsEnvironment.setMetricsContainer(metricsContainer); - try { + try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { Collection<ModelEnforcement<T>> enforcements = new ArrayList<>(); for (ModelEnforcementFactory enforcementFactory : modelEnforcements) { ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform); @@ -117,7 +117,6 @@ class TransformExecutor<T> implements Runnable { // Report the physical metrics from the end of this step. context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative()); - MetricsEnvironment.unsetMetricsContainer(); transformEvaluationState.complete(this); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java index ef2660a8..7c06cbf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.metrics; +import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -29,11 +31,13 @@ import org.slf4j.LoggerFactory; * returned objects to create and modify metrics. * * <p>The runner should create {@link MetricsContainer} for each context in which metrics are - * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that - * may update metrics within that step. + * reported (by step and name) and call {@link #setCurrentContainer} before invoking any code that + * may update metrics within that step. It should call {@link #setCurrentContainer} again to restore + * the previous container. * - * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to - * the previous value) when exiting code that set the metrics container. + * <p>Alternatively, the runner can use {@link #scopedMetricsContainer(MetricsContainer)} to set the + * container for the current thread and get a {@link Closeable} that will restore the previous + * container when closed. */ public class MetricsEnvironment { @@ -45,15 +49,20 @@ public class MetricsEnvironment { private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD = new ThreadLocal<MetricsContainer>(); - /** Set the {@link MetricsContainer} for the current thread. */ - public static void setMetricsContainer(MetricsContainer container) { - CONTAINER_FOR_THREAD.set(container); - } - - - /** Clear the {@link MetricsContainer} for the current thread. */ - public static void unsetMetricsContainer() { - CONTAINER_FOR_THREAD.remove(); + /** + * Set the {@link MetricsContainer} for the current thread. + * + * @return The previous container for the current thread. + */ + @Nullable + public static MetricsContainer setCurrentContainer(@Nullable MetricsContainer container) { + MetricsContainer previous = getCurrentContainer(); + if (container == null) { + CONTAINER_FOR_THREAD.remove(); + } else { + CONTAINER_FOR_THREAD.set(container); + } + return previous; } /** Called by the run to indicate whether metrics reporting is supported. */ @@ -62,6 +71,31 @@ public class MetricsEnvironment { } /** + * Set the {@link MetricsContainer} for the current thread. + * + * @return A {@link Closeable} that will reset the current container to the previous + * {@link MetricsContainer} when closed. + */ + public static Closeable scopedMetricsContainer(MetricsContainer container) { + return new ScopedContainer(container); + } + + private static class ScopedContainer implements Closeable { + + @Nullable + private final MetricsContainer oldContainer; + + private ScopedContainer(MetricsContainer newContainer) { + this.oldContainer = setCurrentContainer(newContainer); + } + + @Override + public void close() throws IOException { + setCurrentContainer(oldContainer); + } + } + + /** * Return the {@link MetricsContainer} for the current thread. * * <p>May return null if metrics are not supported by the current runner or if the current thread http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java index 4200a20..0ce17b4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java @@ -35,7 +35,7 @@ import org.junit.runners.JUnit4; public class MetricsEnvironmentTest { @After public void teardown() { - MetricsEnvironment.unsetMetricsContainer(); + MetricsEnvironment.setCurrentContainer(null); } @Test @@ -44,11 +44,11 @@ public class MetricsEnvironmentTest { MetricsContainer c1 = new MetricsContainer("step1"); MetricsContainer c2 = new MetricsContainer("step2"); - MetricsEnvironment.setMetricsContainer(c1); + MetricsEnvironment.setCurrentContainer(c1); counter.inc(); - MetricsEnvironment.setMetricsContainer(c2); + MetricsEnvironment.setCurrentContainer(c2); counter.dec(); - MetricsEnvironment.unsetMetricsContainer(); + MetricsEnvironment.setCurrentContainer(null); MetricUpdates updates1 = c1.getUpdates(); MetricUpdates updates2 = c2.getUpdates(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index d11b44d..732cb34 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -37,7 +37,7 @@ public class MetricsTest { @After public void tearDown() { - MetricsEnvironment.unsetMetricsContainer(); + MetricsEnvironment.setCurrentContainer(null); } @Test @@ -61,7 +61,7 @@ public class MetricsTest { @Test public void distributionToCell() { MetricsContainer container = new MetricsContainer("step"); - MetricsEnvironment.setMetricsContainer(container); + MetricsEnvironment.setCurrentContainer(container); Distribution distribution = Metrics.distribution(NS, NAME); @@ -80,7 +80,7 @@ public class MetricsTest { @Test public void counterToCell() { MetricsContainer container = new MetricsContainer("step"); - MetricsEnvironment.setMetricsContainer(container); + MetricsEnvironment.setCurrentContainer(container); Counter counter = Metrics.counter(NS, NAME); CounterCell cell = container.getCounter(METRIC_NAME); counter.inc();
