Repository: beam
Updated Branches:
  refs/heads/master 860ac1d1f -> 6ea4b3a42


[BEAM-1334] Split UsesMetrics category and tests into UsesCommittedMetrics and 
UsesAttemptedMetrics


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6e5c341b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6e5c341b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6e5c341b

Branch: refs/heads/master
Commit: 6e5c341b18e512616f51562c5c76992a0923ff42
Parents: 860ac1d
Author: Aviem Zur <[email protected]>
Authored: Sat Jan 28 17:08:58 2017 +0200
Committer: bchambers <[email protected]>
Committed: Wed Feb 1 16:57:57 2017 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  3 +-
 .../beam/runners/direct/DirectMetricsTest.java  | 58 +++++++++----
 runners/flink/runner/pom.xml                    |  6 +-
 runners/google-cloud-dataflow-java/pom.xml      |  3 +-
 runners/spark/pom.xml                           |  3 +-
 .../beam/sdk/testing/UsesAttemptedMetrics.java  | 28 ++++++
 .../beam/sdk/testing/UsesCommittedMetrics.java  | 28 ++++++
 .../apache/beam/sdk/testing/UsesMetrics.java    | 24 ------
 .../apache/beam/sdk/metrics/MetricMatchers.java | 90 ++++++++++++++------
 .../apache/beam/sdk/metrics/MetricsTest.java    | 75 +++++++++++-----
 10 files changed, 224 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 5e16083..002bf9a 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -188,7 +188,8 @@
                 org.apache.beam.sdk.testing.UsesStatefulParDo,
                 org.apache.beam.sdk.testing.UsesTimersInParDo,
                 org.apache.beam.sdk.testing.UsesSplittableParDo,
-                org.apache.beam.sdk.testing.UsesMetrics
+                org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+                org.apache.beam.sdk.testing.UsesCommittedMetrics
               </excludedGroups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index df01244..3ad2bdc 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.beam.runners.direct;
 
-import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
+import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
+import static 
org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
 import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -63,8 +64,9 @@ public class DirectMetricsTest {
     MockitoAnnotations.initMocks(this);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
-  public void testApplyLogicalQueryNoFilter() {
+  public void testApplyCommittedNoFilter() {
     metrics.commitLogical(bundle1, MetricUpdates.create(
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
@@ -82,17 +84,22 @@ public class DirectMetricsTest {
 
     MetricQueryResults results = 
metrics.queryMetrics(MetricsFilter.builder().build());
     assertThat(results.counters(), containsInAnyOrder(
-        metricResult("ns1", "name1", "step1", 5L, 0L),
-        metricResult("ns1", "name2", "step1", 12L, 0L),
-        metricResult("ns1", "name1", "step2", 7L, 0L)));
+        attemptedMetricsResult("ns1", "name1", "step1", 0L),
+        attemptedMetricsResult("ns1", "name2", "step1", 0L),
+        attemptedMetricsResult("ns1", "name1", "step2", 0L)));
+    assertThat(results.counters(), containsInAnyOrder(
+        committedMetricsResult("ns1", "name1", "step1", 5L),
+        committedMetricsResult("ns1", "name2", "step1", 12L),
+        committedMetricsResult("ns1", "name1", "step2", 7L)));
+    assertThat(results.distributions(), contains(
+        attemptedMetricsResult("ns1", "name1", "step1", 
DistributionResult.ZERO)));
     assertThat(results.distributions(), contains(
-        metricResult("ns1", "name1", "step1",
-            DistributionResult.create(12, 3, 3, 5),
-            DistributionResult.ZERO)));
+        committedMetricsResult("ns1", "name1", "step1", 
DistributionResult.create(12, 3, 3, 5))));
   }
 
+  @SuppressWarnings("unchecked")
   @Test
-  public void testApplyPhysicalCountersQueryOneNamespace() {
+  public void testApplyAttemptedCountersQueryOneNamespace() {
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
@@ -104,15 +111,23 @@ public class DirectMetricsTest {
             MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)),
         ImmutableList.<MetricUpdate<DistributionData>>of()));
 
-    assertThat(metrics.queryMetrics(
-        
MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build()).counters(),
+    MetricQueryResults results = metrics.queryMetrics(
+        MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build());
+
+    assertThat(results.counters(),
+        containsInAnyOrder(
+            attemptedMetricsResult("ns1", "name1", "step1", 5L),
+            attemptedMetricsResult("ns1", "name1", "step2", 7L)));
+
+    assertThat(results.counters(),
         containsInAnyOrder(
-            metricResult("ns1", "name1", "step1", 0L, 5L),
-            metricResult("ns1", "name1", "step2", 0L, 7L)));
+            committedMetricsResult("ns1", "name1", "step1", 0L),
+            committedMetricsResult("ns1", "name1", "step2", 0L)));
   }
 
+  @SuppressWarnings("unchecked")
   @Test
-  public void testApplyPhysicalQueryCompositeScope() {
+  public void testApplyAttemptedQueryCompositeScope() {
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
@@ -124,10 +139,17 @@ public class DirectMetricsTest {
             MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 
18L)),
         ImmutableList.<MetricUpdate<DistributionData>>of()));
 
-    assertThat(metrics.queryMetrics(
-        MetricsFilter.builder().addStep("Outer1").build()).counters(),
+    MetricQueryResults results = metrics.queryMetrics(
+        MetricsFilter.builder().addStep("Outer1").build());
+
+    assertThat(results.counters(),
+        containsInAnyOrder(
+            attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L),
+            attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L)));
+
+    assertThat(results.counters(),
         containsInAnyOrder(
-            metricResult("ns1", "name1", "Outer1/Inner1", 0L, 12L),
-            metricResult("ns1", "name1", "Outer1/Inner2", 0L, 8L)));
+            committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L),
+            committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 6a7cbff..fe058b5 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -58,7 +58,8 @@
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
-                    org.apache.beam.sdk.testing.UsesMetrics
+                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+                    org.apache.beam.sdk.testing.UsesCommittedMetrics
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
@@ -90,7 +91,8 @@
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
-                    org.apache.beam.sdk.testing.UsesMetrics
+                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+                    org.apache.beam.sdk.testing.UsesCommittedMetrics
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index f17eb78..35f31b9 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -78,7 +78,8 @@
             <id>runnable-on-service-tests</id>
             <configuration>
               <excludedGroups>
-                org.apache.beam.sdk.testing.UsesMetrics,
+                org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+                org.apache.beam.sdk.testing.UsesCommittedMetrics,
                 org.apache.beam.sdk.testing.UsesTimersInParDo,
                 org.apache.beam.sdk.testing.UsesSplittableParDo,
                 org.apache.beam.sdk.testing.UsesUnboundedPCollections,

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 5d46f8d..ffe6dfd 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -77,7 +77,8 @@
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
-                    org.apache.beam.sdk.testing.UsesMetrics
+                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+                    org.apache.beam.sdk.testing.UsesCommittedMetrics
                   </excludedGroups>
                   <forkCount>1</forkCount>
                   <reuseForks>false</reuseForks>

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesAttemptedMetrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesAttemptedMetrics.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesAttemptedMetrics.java
new file mode 100644
index 0000000..0a17980
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesAttemptedMetrics.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.metrics.MetricResult;
+
+/**
+ * Category tag for validation tests which utilize {@link 
org.apache.beam.sdk.metrics.Metrics}.
+ * Tests tagged with {@link UsesAttemptedMetrics} should be run for runners 
which support
+ * {@link MetricResult#attempted()}.
+ */
+public class UsesAttemptedMetrics {}

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCommittedMetrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCommittedMetrics.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCommittedMetrics.java
new file mode 100644
index 0000000..c22f397
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCommittedMetrics.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.metrics.MetricResult;
+
+/**
+ * Category tag for validation tests which utilize {@link 
org.apache.beam.sdk.metrics.Metrics}.
+ * Tests tagged with {@link UsesCommittedMetrics} should be run for runners 
which support
+ * {@link MetricResult#committed()}.
+ */
+public interface UsesCommittedMetrics {}

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java
deleted file mode 100644
index 261354c..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.testing;
-
-/**
- * Category tag for validation tests which utilize {@link 
org.apache.beam.sdk.metrics.Metrics}.
- */
-public interface UsesMetrics {}

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
index 798d9d4..b8109b3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -29,7 +29,7 @@ import org.hamcrest.TypeSafeMatcher;
  */
 public class MetricMatchers {
 
-  public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, 
final T update) {
+  static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, final T 
update) {
     return new TypeSafeMatcher<MetricUpdate<T>>() {
       @Override
       protected boolean matchesSafely(MetricUpdate<T> item) {
@@ -47,7 +47,7 @@ public class MetricMatchers {
     };
   }
 
-  public static <T> Matcher<MetricUpdate<T>> metricUpdate(
+  static <T> Matcher<MetricUpdate<T>> metricUpdate(
       final String namespace, final String name, final String step, final T 
update) {
     return new TypeSafeMatcher<MetricUpdate<T>>() {
       @Override
@@ -70,16 +70,14 @@ public class MetricMatchers {
     };
   }
 
-  public static <T> Matcher<MetricResult<T>> metricResult(
-      final String namespace, final String name, final String step,
-      final T committed, final T attempted) {
+  public static <T> Matcher<MetricResult<T>> attemptedMetricsResult(
+      final String namespace, final String name, final String step, final T 
attempted) {
     return new TypeSafeMatcher<MetricResult<T>>() {
       @Override
       protected boolean matchesSafely(MetricResult<T> item) {
         return Objects.equals(namespace, item.name().namespace())
             && Objects.equals(name, item.name().name())
             && item.step().contains(step)
-            && Objects.equals(committed, item.committed())
             && Objects.equals(attempted, item.attempted());
       }
 
@@ -89,7 +87,6 @@ public class MetricMatchers {
             .appendText("MetricResult{inNamespace=").appendValue(namespace)
             .appendText(", name=").appendValue(name)
             .appendText(", step=").appendValue(step)
-            .appendText(", committed=").appendValue(committed)
             .appendText(", attempted=").appendValue(attempted)
             .appendText("}");
       }
@@ -97,38 +94,81 @@ public class MetricMatchers {
       @Override
       protected void describeMismatchSafely(MetricResult<T> item, Description 
mismatchDescription) {
         mismatchDescription.appendText("MetricResult{");
-        if (!Objects.equals(namespace, item.name().namespace())) {
-          mismatchDescription
-              .appendText("inNamespace: ").appendValue(namespace)
-              .appendText(" != ").appendValue(item.name().namespace());
-        }
 
-        if (!Objects.equals(name, item.name().name())) {
-          mismatchDescription
-              .appendText("name: ").appendValue(name)
-              .appendText(" != ").appendValue(item.name().name());
-        }
+        describeMetricsResultMembersMismatch(item, mismatchDescription, 
namespace, name, step);
 
-        if (!item.step().contains(step)) {
+        if (!Objects.equals(attempted, item.attempted())) {
           mismatchDescription
-              .appendText("step: ").appendValue(step)
-              .appendText(" != ").appendValue(item.step());
+              .appendText("attempted: ").appendValue(attempted)
+              .appendText(" != ").appendValue(item.attempted());
         }
 
+        mismatchDescription.appendText("}");
+      }
+    };
+  }
+
+  public static <T> Matcher<MetricResult<T>> committedMetricsResult(
+      final String namespace, final String name, final String step,
+      final T committed) {
+    return new TypeSafeMatcher<MetricResult<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricResult<T> item) {
+        return Objects.equals(namespace, item.name().namespace())
+            && Objects.equals(name, item.name().name())
+            && item.step().contains(step)
+            && Objects.equals(committed, item.committed());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricResult{inNamespace=").appendValue(namespace)
+            .appendText(", name=").appendValue(name)
+            .appendText(", step=").appendValue(step)
+            .appendText(", committed=").appendValue(committed)
+            .appendText("}");
+      }
+
+      @Override
+      protected void describeMismatchSafely(MetricResult<T> item, Description 
mismatchDescription) {
+        mismatchDescription.appendText("MetricResult{");
+
+        describeMetricsResultMembersMismatch(item, mismatchDescription, 
namespace, name, step);
+
         if (!Objects.equals(committed, item.committed())) {
           mismatchDescription
               .appendText("committed: ").appendValue(committed)
               .appendText(" != ").appendValue(item.committed());
         }
 
-        if (!Objects.equals(attempted, item.attempted())) {
-          mismatchDescription
-              .appendText("attempted: ").appendValue(attempted)
-              .appendText(" != ").appendValue(item.attempted());
-        }
         mismatchDescription.appendText("}");
       }
     };
   }
 
+  private static <T> void describeMetricsResultMembersMismatch(
+      MetricResult<T> item,
+      Description mismatchDescription,
+      String namespace,
+      String name,
+      String step) {
+    if (!Objects.equals(namespace, item.name().namespace())) {
+      mismatchDescription
+          .appendText("inNamespace: ").appendValue(namespace)
+          .appendText(" != ").appendValue(item.name().namespace());
+    }
+
+    if (!Objects.equals(name, item.name().name())) {
+      mismatchDescription
+          .appendText("name: ").appendValue(name)
+          .appendText(" != ").appendValue(item.name().name());
+    }
+
+    if (!item.step().contains(step)) {
+      mismatchDescription
+          .appendText("step: ").appendValue(step)
+          .appendText(" != ").appendValue(item.step());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6e5c341b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 075df19..9ad0935 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.beam.sdk.metrics;
 
-import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
+import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
+import static 
org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertNull;
@@ -29,7 +30,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.UsesMetrics;
+import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
+import org.apache.beam.sdk.testing.UsesCommittedMetrics;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -108,14 +110,59 @@ public class MetricsTest implements Serializable {
     assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L));
   }
 
-  @Category({RunnableOnService.class, UsesMetrics.class})
+  @Category({RunnableOnService.class, UsesCommittedMetrics.class})
   @Test
-  public void metricsReportToQuery() {
+  public void committedMetricsReportToQuery() {
+    PipelineResult result = runPipelineWithMetrics();
+
+    MetricQueryResults metrics = 
result.metrics().queryMetrics(MetricsFilter.builder()
+      .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
+      .build());
+
+    assertThat(metrics.counters(), hasItem(
+        committedMetricsResult(MetricsTest.class.getName(), "count", 
"MyStep1", 3L)));
+    assertThat(metrics.distributions(), hasItem(
+        committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1",
+            DistributionResult.create(26L, 3L, 5L, 13L))));
+
+    assertThat(metrics.counters(), hasItem(
+        committedMetricsResult(MetricsTest.class.getName(), "count", 
"MyStep2", 6L)));
+    assertThat(metrics.distributions(), hasItem(
+        committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2",
+            DistributionResult.create(52L, 6L, 5L, 13L))));
+  }
+
+
+  @Category({RunnableOnService.class, UsesAttemptedMetrics.class})
+  @Test
+  public void attemptedMetricsReportToQuery() {
+    PipelineResult result = runPipelineWithMetrics();
+
+    MetricQueryResults metrics = 
result.metrics().queryMetrics(MetricsFilter.builder()
+        .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
+        .build());
+
+    // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
+    assertThat(metrics.counters(), hasItem(
+        attemptedMetricsResult(MetricsTest.class.getName(), "count", 
"MyStep1", 3L)));
+    assertThat(metrics.distributions(), hasItem(
+        attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1",
+            DistributionResult.create(26L, 3L, 5L, 13L))));
+
+    assertThat(metrics.counters(), hasItem(
+        attemptedMetricsResult(MetricsTest.class.getName(), "count", 
"MyStep2", 6L)));
+    assertThat(metrics.distributions(), hasItem(
+        attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2",
+            DistributionResult.create(52L, 6L, 5L, 13L))));
+  }
+
+  private PipelineResult runPipelineWithMetrics() {
     final Counter count = Metrics.counter(MetricsTest.class, "count");
     Pipeline pipeline = TestPipeline.create();
     pipeline
         .apply(Create.of(5, 8, 13))
         .apply("MyStep1", ParDo.of(new DoFn<Integer, Integer>() {
+          @SuppressWarnings("unused")
           @ProcessElement
           public void processElement(ProcessContext c) {
             Distribution values = Metrics.distribution(MetricsTest.class, 
"input");
@@ -127,6 +174,7 @@ public class MetricsTest implements Serializable {
           }
         }))
         .apply("MyStep2", ParDo.of(new DoFn<Integer, Integer>() {
+          @SuppressWarnings("unused")
           @ProcessElement
           public void processElement(ProcessContext c) {
             Distribution values = Metrics.distribution(MetricsTest.class, 
"input");
@@ -137,23 +185,6 @@ public class MetricsTest implements Serializable {
     PipelineResult result = pipeline.run();
 
     result.waitUntilFinish();
-
-    MetricQueryResults metrics = 
result.metrics().queryMetrics(MetricsFilter.builder()
-      .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
-      .build());
-    // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
-    assertThat(metrics.counters(), hasItem(
-        metricResult(MetricsTest.class.getName(), "count", "MyStep1", 3L, 
3L)));
-    assertThat(metrics.distributions(), hasItem(
-        metricResult(MetricsTest.class.getName(), "input", "MyStep1",
-            DistributionResult.create(26L, 3L, 5L, 13L),
-            DistributionResult.create(26L, 3L, 5L, 13L))));
-
-    assertThat(metrics.counters(), hasItem(
-        metricResult(MetricsTest.class.getName(), "count", "MyStep2", 6L, 
6L)));
-    assertThat(metrics.distributions(), hasItem(
-        metricResult(MetricsTest.class.getName(), "input", "MyStep2",
-            DistributionResult.create(52L, 6L, 5L, 13L),
-            DistributionResult.create(52L, 6L, 5L, 13L))));
+    return result;
   }
 }

Reply via email to