echauchot commented on code in PR #22157:
URL: https://github.com/apache/beam/pull/22157#discussion_r924263799


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java:
##########
@@ -33,7 +41,35 @@ public static AggregatorMetric of(final NamedAggregators 
namedAggregators) {
     return new AggregatorMetric(namedAggregators);
   }

Review Comment:
   remove final in the parameters as in the other aggregatormetric class.
   I agreed with this suppression: final parameters are kind of a convention in 
java. Otherwise we would have final parameters in all methods.



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/BeamMetricSet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import java.util.Map;
+
+/**
+ * Map of Beam metrics available from {@link Gauge#getValue()}.
+ *
+ * <p>Note: Recent versions of Dropwizard {@link 
com.codahale.metrics.MetricRegistry MetricRegistry}
+ * do not allow registering arbitrary implementations of {@link 
com.codahale.metrics.Metric
+ * Metrics}.
+ */
+public abstract class BeamMetricSet implements Gauge<Map<String, 
Gauge<Double>>> {

Review Comment:
   can be package local



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 
 /**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's 
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard 
{@link Metric}
+ * interface.
  */
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
   private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
 
-  static Map<String, ?> renderAll(MetricResults metricResults) {
-    Map<String, Object> metrics = new HashMap<>();
-    MetricQueryResults metricQueryResults = metricResults.allMetrics();
-    for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
-      metrics.put(renderName(metricResult), metricResult.getAttempted());
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+    Map<String, Gauge<Double>> metrics = new HashMap<>();
+    MetricQueryResults allMetrics = metricResults.allMetrics();
+    for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+      putFiltered(metrics, filter, renderName(prefix, metricResult), 
metricResult.getAttempted());
     }
-    for (MetricResult<DistributionResult> metricResult : 
metricQueryResults.getDistributions()) {
+    for (MetricResult<DistributionResult> metricResult : 
allMetrics.getDistributions()) {
       DistributionResult result = metricResult.getAttempted();
-      metrics.put(renderName(metricResult) + ".count", result.getCount());
-      metrics.put(renderName(metricResult) + ".sum", result.getSum());
-      metrics.put(renderName(metricResult) + ".min", result.getMin());
-      metrics.put(renderName(metricResult) + ".max", result.getMax());
-      metrics.put(renderName(metricResult) + ".mean", result.getMean());
+      String baseName = renderName(prefix, metricResult);
+      putFiltered(metrics, filter, baseName + ".count", result.getCount());
+      putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+      putFiltered(metrics, filter, baseName + ".min", result.getMin());
+      putFiltered(metrics, filter, baseName + ".max", result.getMax());
+      putFiltered(metrics, filter, baseName + ".mean", result.getMean());
     }
-    for (MetricResult<GaugeResult> metricResult : 
metricQueryResults.getGauges()) {
-      metrics.put(renderName(metricResult), 
metricResult.getAttempted().getValue());
+    for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+      putFiltered(
+          metrics,
+          filter,
+          renderName(prefix, metricResult),
+          metricResult.getAttempted().getValue());
     }
     return metrics;
   }
 
-  Map<String, ?> renderAll() {
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
-    return renderAll(metricResults);
-  }
-
   @VisibleForTesting
-  static String renderName(MetricResult<?> metricResult) {
+  @SuppressWarnings("nullness") // ok to have nullable elements on stream
+  static String renderName(String prefix, MetricResult<?> metricResult) {
     MetricKey key = metricResult.getKey();
     MetricName name = key.metricName();
-    String step = key.stepName();
-
-    ArrayList<String> pieces = new ArrayList<>();
+    return Streams.concat(
+            Stream.of(prefix), // prefix is not cleaned, should it be?
+            Stream.of(stripSuffix(cleanPart(key.stepName()))),

Review Comment:
   keep the step variable because it has a meaning for beam (step in the 
pipeline, ie transform) cf [metrics 
pres](https://www.slideshare.net/slideshow/embed_code/key/kKJRzR8HxkxLsR). cf 
s17 and s26



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 
 /**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's 
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard 
{@link Metric}
+ * interface.
  */
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
   private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
 
-  static Map<String, ?> renderAll(MetricResults metricResults) {
-    Map<String, Object> metrics = new HashMap<>();
-    MetricQueryResults metricQueryResults = metricResults.allMetrics();
-    for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
-      metrics.put(renderName(metricResult), metricResult.getAttempted());
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+    Map<String, Gauge<Double>> metrics = new HashMap<>();
+    MetricQueryResults allMetrics = metricResults.allMetrics();
+    for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+      putFiltered(metrics, filter, renderName(prefix, metricResult), 
metricResult.getAttempted());
     }
-    for (MetricResult<DistributionResult> metricResult : 
metricQueryResults.getDistributions()) {
+    for (MetricResult<DistributionResult> metricResult : 
allMetrics.getDistributions()) {
       DistributionResult result = metricResult.getAttempted();
-      metrics.put(renderName(metricResult) + ".count", result.getCount());
-      metrics.put(renderName(metricResult) + ".sum", result.getSum());
-      metrics.put(renderName(metricResult) + ".min", result.getMin());
-      metrics.put(renderName(metricResult) + ".max", result.getMax());
-      metrics.put(renderName(metricResult) + ".mean", result.getMean());
+      String baseName = renderName(prefix, metricResult);
+      putFiltered(metrics, filter, baseName + ".count", result.getCount());
+      putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+      putFiltered(metrics, filter, baseName + ".min", result.getMin());
+      putFiltered(metrics, filter, baseName + ".max", result.getMax());
+      putFiltered(metrics, filter, baseName + ".mean", result.getMean());
     }
-    for (MetricResult<GaugeResult> metricResult : 
metricQueryResults.getGauges()) {
-      metrics.put(renderName(metricResult), 
metricResult.getAttempted().getValue());
+    for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+      putFiltered(
+          metrics,
+          filter,
+          renderName(prefix, metricResult),
+          metricResult.getAttempted().getValue());
     }
     return metrics;
   }
 
-  Map<String, ?> renderAll() {
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
-    return renderAll(metricResults);
-  }
-
   @VisibleForTesting
-  static String renderName(MetricResult<?> metricResult) {
+  @SuppressWarnings("nullness") // ok to have nullable elements on stream
+  static String renderName(String prefix, MetricResult<?> metricResult) {
     MetricKey key = metricResult.getKey();
     MetricName name = key.metricName();
-    String step = key.stepName();
-
-    ArrayList<String> pieces = new ArrayList<>();
+    return Streams.concat(
+            Stream.of(prefix), // prefix is not cleaned, should it be?
+            Stream.of(stripSuffix(cleanPart(key.stepName()))),
+            Stream.of(name.getNamespace(), 
name.getName()).map(SparkBeamMetric::cleanPart))
+        .filter(not(Strings::isNullOrEmpty))
+        .collect(Collectors.joining("."));

Review Comment:
   clearer than previous code, thanks !



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java:
##########
@@ -39,26 +42,24 @@ public InMemoryMetrics(
     internalMetricRegistry = metricRegistry;
   }
 
-  @SuppressWarnings("TypeParameterUnusedInFormals")
-  public static <T> T valueOf(final String name) {
-    final T retVal;
+  // Constructor for Spark >= 3.2
+  @SuppressWarnings("UnusedParameters")
+  public InMemoryMetrics(final Properties properties, final MetricRegistry 
metricRegistry) {
+    extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
+    internalMetricRegistry = metricRegistry;
+  }
 
+  @SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"})
+  public static <T> T valueOf(final String name) {
     // this might fail in case we have multiple aggregators with the same 
suffix after
     // the last dot, but it should be good enough for tests.
-    if (extendedMetricsRegistry != null
-        && extendedMetricsRegistry.getGauges().keySet().stream()
-            .anyMatch(Predicates.containsPattern(name + "$")::apply)) {
-      String key =
-          extendedMetricsRegistry.getGauges().keySet().stream()
-              .filter(Predicates.containsPattern(name + "$")::apply)
-              .findFirst()
-              .get();
-      retVal = (T) extendedMetricsRegistry.getGauges().get(key).getValue();
+    if (extendedMetricsRegistry != null) {
+      Collection<Gauge> matches =
+          extendedMetricsRegistry.getGauges((n, m) -> 
n.endsWith(name)).values();

Review Comment:
   simpler code thanks



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -76,9 +67,10 @@ public void testInBatchMode() throws Exception {
             .apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()))
             .apply(new WordCount.CountWords())
             .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNTS);
-    pipeline.run();
+    pipeline.run().waitUntilFinish();

Review Comment:
   IIRC in test mode the pipeline default used to be waitUntilFish so that we 
have a blocking call to assert afterwards. Is it not still the case ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java:
##########
@@ -27,6 +27,7 @@
 
 /** Test SparkBeamMetric. */
 public class SparkBeamMetricTest {

Review Comment:
   why is there no SparkBeamMetricTest for the SS runner ? Ah I get it it is 
the BeamMetricTest  class. Can you rename BeamMetricTest to SparkBeamMetricTest 
as the class it tests is SparkBeamMetric ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/BeamMetricSet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import java.util.Map;
+
+/**
+ * Map of Beam metrics available from {@link Gauge#getValue()}.
+ *
+ * <p>Note: Recent versions of Dropwizard {@link 
com.codahale.metrics.MetricRegistry MetricRegistry}
+ * do not allow registering arbitrary implementations of {@link 
com.codahale.metrics.Metric
+ * Metrics}.

Review Comment:
   ```suggestion
    * Recent versions of Dropwizard {@link com.codahale.metrics.MetricRegistry 
MetricRegistry}
    * do not allow registering arbitrary implementations of {@link 
com.codahale.metrics.Metric
    * Metrics}. So this class is a Beam metrics registry implemented as a 
Dropwizard {@link Gauge} for use by spark engine.
   ```



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 
 /**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's 
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard 
{@link Metric}
+ * interface.
  */
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
   private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
 
-  static Map<String, ?> renderAll(MetricResults metricResults) {
-    Map<String, Object> metrics = new HashMap<>();
-    MetricQueryResults metricQueryResults = metricResults.allMetrics();
-    for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
-      metrics.put(renderName(metricResult), metricResult.getAttempted());
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+    Map<String, Gauge<Double>> metrics = new HashMap<>();

Review Comment:
   So now, all the beam metrics are stored in dropwizard `Gauge<Double>` ?  
You're sure that all the types of Beam metrics `Counter, Gauge, Histogram and 
Distribution` are compatible with DropWizard `Gauge<Double>`. I guess as 
Distribution content is extracted into 5 entries as before it works. We will 
see in the tests anyway.
   
   But still no support for histograms (though it was not the aim of the PR to 
add this feature)



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 
 /**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's 
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard 
{@link Metric}
+ * interface.
  */
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
   private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
 
-  static Map<String, ?> renderAll(MetricResults metricResults) {
-    Map<String, Object> metrics = new HashMap<>();
-    MetricQueryResults metricQueryResults = metricResults.allMetrics();
-    for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
-      metrics.put(renderName(metricResult), metricResult.getAttempted());
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+    Map<String, Gauge<Double>> metrics = new HashMap<>();
+    MetricQueryResults allMetrics = metricResults.allMetrics();
+    for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+      putFiltered(metrics, filter, renderName(prefix, metricResult), 
metricResult.getAttempted());
     }
-    for (MetricResult<DistributionResult> metricResult : 
metricQueryResults.getDistributions()) {
+    for (MetricResult<DistributionResult> metricResult : 
allMetrics.getDistributions()) {
       DistributionResult result = metricResult.getAttempted();
-      metrics.put(renderName(metricResult) + ".count", result.getCount());
-      metrics.put(renderName(metricResult) + ".sum", result.getSum());
-      metrics.put(renderName(metricResult) + ".min", result.getMin());
-      metrics.put(renderName(metricResult) + ".max", result.getMax());
-      metrics.put(renderName(metricResult) + ".mean", result.getMean());
+      String baseName = renderName(prefix, metricResult);
+      putFiltered(metrics, filter, baseName + ".count", result.getCount());
+      putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+      putFiltered(metrics, filter, baseName + ".min", result.getMin());
+      putFiltered(metrics, filter, baseName + ".max", result.getMax());
+      putFiltered(metrics, filter, baseName + ".mean", result.getMean());
     }
-    for (MetricResult<GaugeResult> metricResult : 
metricQueryResults.getGauges()) {
-      metrics.put(renderName(metricResult), 
metricResult.getAttempted().getValue());
+    for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+      putFiltered(
+          metrics,
+          filter,
+          renderName(prefix, metricResult),
+          metricResult.getAttempted().getValue());
     }
     return metrics;
   }
 
-  Map<String, ?> renderAll() {
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
-    return renderAll(metricResults);
-  }
-
   @VisibleForTesting
-  static String renderName(MetricResult<?> metricResult) {
+  @SuppressWarnings("nullness") // ok to have nullable elements on stream
+  static String renderName(String prefix, MetricResult<?> metricResult) {
     MetricKey key = metricResult.getKey();
     MetricName name = key.metricName();
-    String step = key.stepName();
-
-    ArrayList<String> pieces = new ArrayList<>();
+    return Streams.concat(
+            Stream.of(prefix), // prefix is not cleaned, should it be?

Review Comment:
   breaking change ? Users will have their spark metrics prefixed now or is 
this abstracted by new spark engine ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/BeamMetricSet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import java.util.Map;
+
+/**
+ * Map of Beam metrics available from {@link Gauge#getValue()}.
+ *
+ * <p>Note: Recent versions of Dropwizard {@link 
com.codahale.metrics.MetricRegistry MetricRegistry}
+ * do not allow registering arbitrary implementations of {@link 
com.codahale.metrics.Metric
+ * Metrics}.
+ */
+public abstract class BeamMetricSet implements Gauge<Map<String, 
Gauge<Double>>> {
+
+  @Override
+  public final Map<String, Gauge<Double>> getValue() {
+    return getValue("", MetricFilter.ALL);
+  }
+
+  protected abstract Map<String, Gauge<Double>> getValue(String prefix, 
MetricFilter filter);
+
+  protected Gauge<Double> staticGauge(Number number) {
+    return new StaticGauge(number.doubleValue());
+  }
+
+  private static class StaticGauge implements Gauge<Double> {

Review Comment:
   we always make nested class static when possible to avoid serialization 
issues. So this name is not very relevant consider renaming to `DoubleGauge`



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/BeamMetricSet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import java.util.Map;
+
+/**
+ * Map of Beam metrics available from {@link Gauge#getValue()}.
+ *
+ * <p>Note: Recent versions of Dropwizard {@link 
com.codahale.metrics.MetricRegistry MetricRegistry}
+ * do not allow registering arbitrary implementations of {@link 
com.codahale.metrics.Metric
+ * Metrics}.
+ */
+public abstract class BeamMetricSet implements Gauge<Map<String, 
Gauge<Double>>> {

Review Comment:
   can be package local



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 
 /**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's 
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard 
{@link Metric}
+ * interface.
  */
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
   private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
 
-  static Map<String, ?> renderAll(MetricResults metricResults) {
-    Map<String, Object> metrics = new HashMap<>();
-    MetricQueryResults metricQueryResults = metricResults.allMetrics();
-    for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
-      metrics.put(renderName(metricResult), metricResult.getAttempted());
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+    Map<String, Gauge<Double>> metrics = new HashMap<>();
+    MetricQueryResults allMetrics = metricResults.allMetrics();
+    for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+      putFiltered(metrics, filter, renderName(prefix, metricResult), 
metricResult.getAttempted());
     }
-    for (MetricResult<DistributionResult> metricResult : 
metricQueryResults.getDistributions()) {
+    for (MetricResult<DistributionResult> metricResult : 
allMetrics.getDistributions()) {
       DistributionResult result = metricResult.getAttempted();
-      metrics.put(renderName(metricResult) + ".count", result.getCount());
-      metrics.put(renderName(metricResult) + ".sum", result.getSum());
-      metrics.put(renderName(metricResult) + ".min", result.getMin());
-      metrics.put(renderName(metricResult) + ".max", result.getMax());
-      metrics.put(renderName(metricResult) + ".mean", result.getMean());
+      String baseName = renderName(prefix, metricResult);
+      putFiltered(metrics, filter, baseName + ".count", result.getCount());
+      putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+      putFiltered(metrics, filter, baseName + ".min", result.getMin());
+      putFiltered(metrics, filter, baseName + ".max", result.getMax());
+      putFiltered(metrics, filter, baseName + ".mean", result.getMean());
     }
-    for (MetricResult<GaugeResult> metricResult : 
metricQueryResults.getGauges()) {
-      metrics.put(renderName(metricResult), 
metricResult.getAttempted().getValue());
+    for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+      putFiltered(
+          metrics,
+          filter,
+          renderName(prefix, metricResult),
+          metricResult.getAttempted().getValue());
     }
     return metrics;
   }
 
-  Map<String, ?> renderAll() {
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
-    return renderAll(metricResults);
-  }
-
   @VisibleForTesting
-  static String renderName(MetricResult<?> metricResult) {
+  @SuppressWarnings("nullness") // ok to have nullable elements on stream
+  static String renderName(String prefix, MetricResult<?> metricResult) {
     MetricKey key = metricResult.getKey();
     MetricName name = key.metricName();
-    String step = key.stepName();
-
-    ArrayList<String> pieces = new ArrayList<>();
+    return Streams.concat(
+            Stream.of(prefix), // prefix is not cleaned, should it be?
+            Stream.of(stripSuffix(cleanPart(key.stepName()))),
+            Stream.of(name.getNamespace(), 
name.getName()).map(SparkBeamMetric::cleanPart))
+        .filter(not(Strings::isNullOrEmpty))
+        .collect(Collectors.joining("."));
+  }
 
-    if (step != null) {
-      step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
-      if (step.endsWith("_")) {
-        step = step.substring(0, step.length() - 1);
-      }
-      pieces.add(step);
-    }
+  private static @Nullable String cleanPart(@Nullable String str) {

Review Comment:
   nit picking: I would rename to `normalizeString` or something similar



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java:
##########
@@ -33,7 +41,35 @@ public static AggregatorMetric of(final NamedAggregators 
namedAggregators) {
     return new AggregatorMetric(namedAggregators);
   }
 
-  NamedAggregators getNamedAggregators() {
-    return namedAggregators;
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    Map<String, Gauge<Double>> metrics = new HashMap<>();
+    for (Map.Entry<String, ?> entry : namedAggregators.renderAll().entrySet()) 
{
+      String name = prefix + "." + entry.getKey();
+      Object rawValue = entry.getValue();
+      if (rawValue == null) {
+        continue;

Review Comment:
   I'm not a big fan of this coding style. I would prefer 
   ```
   if (rawValue  != null) {
   try {
   ...
   }
   }
   ```



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/sink/CodahaleGraphiteSink.java:
##########
@@ -21,14 +21,69 @@
 import java.util.Properties;
 import 
org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetric;
 import 
org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
+import org.apache.spark.SecurityManager;
 import org.apache.spark.metrics.sink.Sink;
 
-/** A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics to Graphite. */
-public class CodahaleGraphiteSink extends 
org.apache.spark.metrics.sink.GraphiteSink {
+/**
+ * A {@link Sink} for <a 
href="https://spark.apache.org/docs/latest/monitoring.html#metrics";>Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to 
Graphite.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ * 
"spark.metrics.conf.*.sink.graphite.class"="org.apache.beam.runners.spark.structuredstreaming.metrics.sink.CodahaleGraphiteSink"
+ * "spark.metrics.conf.*.sink.graphite.host"="<graphite_hostname>"
+ * "spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
+ * "spark.metrics.conf.*.sink.graphite.period"=10
+ * "spark.metrics.conf.*.sink.graphite.unit"=seconds
+ * "spark.metrics.conf.*.sink.graphite.prefix"="<optional_prefix>"
+ * 
"spark.metrics.conf.*.sink.graphite.regex"="<optional_regex_to_send_matching_metrics>"
+ * }</pre>
+ */
+public class CodahaleGraphiteSink implements Sink {

Review Comment:
   Why not call it GraphiteSink like in the non-structuredstreaming runner ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
 
 /**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's 
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard 
{@link Metric}
+ * interface.
  */
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
   private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
 
-  static Map<String, ?> renderAll(MetricResults metricResults) {
-    Map<String, Object> metrics = new HashMap<>();
-    MetricQueryResults metricQueryResults = metricResults.allMetrics();
-    for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
-      metrics.put(renderName(metricResult), metricResult.getAttempted());
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+    Map<String, Gauge<Double>> metrics = new HashMap<>();
+    MetricQueryResults allMetrics = metricResults.allMetrics();
+    for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+      putFiltered(metrics, filter, renderName(prefix, metricResult), 
metricResult.getAttempted());
     }
-    for (MetricResult<DistributionResult> metricResult : 
metricQueryResults.getDistributions()) {
+    for (MetricResult<DistributionResult> metricResult : 
allMetrics.getDistributions()) {
       DistributionResult result = metricResult.getAttempted();
-      metrics.put(renderName(metricResult) + ".count", result.getCount());
-      metrics.put(renderName(metricResult) + ".sum", result.getSum());
-      metrics.put(renderName(metricResult) + ".min", result.getMin());
-      metrics.put(renderName(metricResult) + ".max", result.getMax());
-      metrics.put(renderName(metricResult) + ".mean", result.getMean());
+      String baseName = renderName(prefix, metricResult);
+      putFiltered(metrics, filter, baseName + ".count", result.getCount());
+      putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+      putFiltered(metrics, filter, baseName + ".min", result.getMin());
+      putFiltered(metrics, filter, baseName + ".max", result.getMax());
+      putFiltered(metrics, filter, baseName + ".mean", result.getMean());
     }
-    for (MetricResult<GaugeResult> metricResult : 
metricQueryResults.getGauges()) {
-      metrics.put(renderName(metricResult), 
metricResult.getAttempted().getValue());
+    for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+      putFiltered(
+          metrics,
+          filter,
+          renderName(prefix, metricResult),
+          metricResult.getAttempted().getValue());
     }
     return metrics;
   }
 
-  Map<String, ?> renderAll() {
-    MetricResults metricResults =
-        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
-    return renderAll(metricResults);
-  }
-
   @VisibleForTesting
-  static String renderName(MetricResult<?> metricResult) {
+  @SuppressWarnings("nullness") // ok to have nullable elements on stream
+  static String renderName(String prefix, MetricResult<?> metricResult) {
     MetricKey key = metricResult.getKey();
     MetricName name = key.metricName();
-    String step = key.stepName();
-
-    ArrayList<String> pieces = new ArrayList<>();
+    return Streams.concat(
+            Stream.of(prefix), // prefix is not cleaned, should it be?
+            Stream.of(stripSuffix(cleanPart(key.stepName()))),
+            Stream.of(name.getNamespace(), 
name.getName()).map(SparkBeamMetric::cleanPart))
+        .filter(not(Strings::isNullOrEmpty))
+        .collect(Collectors.joining("."));
+  }
 
-    if (step != null) {
-      step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
-      if (step.endsWith("_")) {
-        step = step.substring(0, step.length() - 1);
-      }
-      pieces.add(step);
-    }
+  private static @Nullable String cleanPart(@Nullable String str) {
+    return str != null ? str.replaceAll(ILLEGAL_CHARACTERS, "_") : null;
+  }
 
-    pieces.addAll(
-        ImmutableList.of(name.getNamespace(), name.getName()).stream()
-            .map(str -> str.replaceAll(ILLEGAL_CHARACTERS, "_"))
-            .collect(toList()));
+  private static @Nullable String stripSuffix(@Nullable String str) {
+    return str != null && str.endsWith("_") ? str.substring(0, str.length() - 
1) : str;
+  }
 
-    return String.join(".", pieces);
+  private void putFiltered(

Review Comment:
   previously the metrics were not filtered, how is it working with the 
dropWizard filters ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java:
##########
@@ -33,7 +41,35 @@ public static AggregatorMetric of(final NamedAggregators 
namedAggregators) {
     return new AggregatorMetric(namedAggregators);
   }
 
-  NamedAggregators getNamedAggregators() {
-    return namedAggregators;
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    Map<String, Gauge<Double>> metrics = new HashMap<>();
+    for (Map.Entry<String, ?> entry : namedAggregators.renderAll().entrySet()) 
{
+      String name = prefix + "." + entry.getKey();
+      Object rawValue = entry.getValue();
+      if (rawValue == null) {
+        continue;
+      }
+      try {
+        Gauge<Double> gauge = staticGauge(rawValue);
+        if (filter.matches(name, gauge)) {
+          metrics.put(name, gauge);
+        }
+      } catch (NumberFormatException e) {
+        LOG.warn(
+            "Metric `{}` of type {} can't be reported, conversion to double 
failed.",
+            name,
+            rawValue.getClass().getSimpleName(),
+            e);
+      }
+    }
+    return metrics;
+  }
+
+  // Metric type is assumed to be compatible with Double

Review Comment:
   let me get this clear: with new DropWizard version you can no more create a 
custom metrics registry so you use a DropWizard Gauge to make a registry 
(`Gauge<Map<String, Gauge<Double>>>`).  Every type of Beam metrics can be 
stored in DropWizard `Gauge<Double>`. 
   
   See also similar comment in the other subclass of  BeamMetricSet



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java:
##########
@@ -88,95 +71,21 @@ public SortedMap<String, Counter> getCounters(final 
MetricFilter filter) {
 
   @Override
   public SortedMap<String, Gauge> getGauges(final MetricFilter filter) {
-    return new ImmutableSortedMap.Builder<String, Gauge>(
-            Ordering.from(String.CASE_INSENSITIVE_ORDER))
-        .putAll(internalMetricRegistry.getGauges(filter))
-        .putAll(extractGauges(internalMetricRegistry, filter))
-        .build();
-  }
-
-  private Map<String, Gauge> extractGauges(
-      final MetricRegistry metricRegistry, final MetricFilter filter) {
-    Map<String, Gauge> gauges = new HashMap<>();
-
-    // find the AggregatorMetric metrics from within all currently registered 
metrics
-    final Optional<Map<String, Gauge>> aggregatorMetrics =
-        FluentIterable.from(metricRegistry.getMetrics().entrySet())
-            .firstMatch(isAggregatorMetric())
-            .transform(aggregatorMetricToGauges());
-
-    // find the SparkBeamMetric metrics from within all currently registered 
metrics
-    final Optional<Map<String, Gauge>> beamMetrics =
-        FluentIterable.from(metricRegistry.getMetrics().entrySet())
-            .firstMatch(isSparkBeamMetric())
-            .transform(beamMetricToGauges());
-
-    if (aggregatorMetrics.isPresent()) {
-      gauges.putAll(Maps.filterEntries(aggregatorMetrics.get(), 
matches(filter)));
-    }
-
-    if (beamMetrics.isPresent()) {
-      gauges.putAll(Maps.filterEntries(beamMetrics.get(), matches(filter)));
-    }
-
-    return gauges;
-  }
-
-  private Function<Map.Entry<String, Metric>, Map<String, Gauge>> 
aggregatorMetricToGauges() {
-    return entry -> {
-      final NamedAggregators agg = ((AggregatorMetric) 
entry.getValue()).getNamedAggregators();
-      final String parentName = entry.getKey();
-      final Map<String, Gauge> gaugeMap = 
Maps.transformEntries(agg.renderAll(), toGauge());
-      final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap();
-      for (Map.Entry<String, Gauge> gaugeEntry : gaugeMap.entrySet()) {
-        fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(), 
gaugeEntry.getValue());
+    ImmutableSortedMap.Builder<String, Gauge> builder =
+        new 
ImmutableSortedMap.Builder<>(Ordering.from(String.CASE_INSENSITIVE_ORDER));
+
+    Map<String, Gauge> gauges =
+        internalMetricRegistry.getGauges(
+            (n, m) -> filter.matches(n, m) || m instanceof BeamMetricSet);
+
+    for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+      Gauge gauge = entry.getValue();
+      if (gauge instanceof BeamMetricSet) {
+        builder.putAll(((BeamMetricSet) gauge).getValue(entry.getKey(), 
filter));
+      } else {
+        builder.put(entry.getKey(), gauge);

Review Comment:
   Way more maintainable code, thanks ! 
   If the tests cover enough and pass, we are all good



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -21,51 +21,42 @@
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
-import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExternalResource;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * TODO: add testInStreamingMode() once streaming support will be implemented.
- *
- * <p>A test that verifies Beam metrics are reported to Spark's metrics sink 
in both batch and
- * streaming modes.
- */
-@Ignore("Has been failing since at least 
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+/** A test that verifies Beam metrics are reported to Spark's metrics sink in 
batch mode. */
 @RunWith(JUnit4.class)
 public class SparkMetricsSinkTest {
+
+  @ClassRule
+  public static SparkSessionRule sessionRule =
+      new SparkSessionRule(
+          KV.of("spark.metrics.conf.*.sink.memory.class", 
InMemoryMetrics.class.getName()));

Review Comment:
   Better indeed to configure the context here rather than having the strange 
conf file with a single entry as before. Thanks



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/sink/CodahaleCsvSink.java:
##########
@@ -21,16 +21,66 @@
 import java.util.Properties;
 import 
org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetric;
 import 
org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
+import org.apache.spark.SecurityManager;
 import org.apache.spark.metrics.sink.Sink;
 
 /**
- * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics to a CSV file.
+ * A {@link Sink} for <a 
href="https://spark.apache.org/docs/latest/monitoring.html#metrics";>Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to a 
CSV file.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ * 
"spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.structuredstreaming.metrics.sink.CodahaleCsvSink"
+ * "spark.metrics.conf.*.sink.csv.directory"="<output_directory>"
+ * "spark.metrics.conf.*.sink.csv.period"=10
+ * "spark.metrics.conf.*.sink.csv.unit"=seconds
+ * }</pre>
  */
-public class CodahaleCsvSink extends org.apache.spark.metrics.sink.CsvSink {
+public class CodahaleCsvSink implements Sink {

Review Comment:
   Why not call it CsvSink like in the non-structuredstreaming runner ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -21,51 +21,42 @@
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
-import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExternalResource;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * TODO: add testInStreamingMode() once streaming support will be implemented.
- *
- * <p>A test that verifies Beam metrics are reported to Spark's metrics sink 
in both batch and
- * streaming modes.
- */
-@Ignore("Has been failing since at least 
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+/** A test that verifies Beam metrics are reported to Spark's metrics sink in 
batch mode. */
 @RunWith(JUnit4.class)
 public class SparkMetricsSinkTest {
+
+  @ClassRule
+  public static SparkSessionRule sessionRule =
+      new SparkSessionRule(
+          KV.of("spark.metrics.conf.*.sink.memory.class", 
InMemoryMetrics.class.getName()));
+
   @Rule public ExternalResource inMemoryMetricsSink = new 
InMemoryMetricsSinkRule();
 
+  @Rule
+  public TestPipeline pipeline = 
TestPipeline.fromOptions(sessionRule.createPipelineOptions());

Review Comment:
   IIRC runner, test mode etc are configured in 
`sessionRule#createPipelineOptions`



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -21,51 +21,42 @@
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
-import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExternalResource;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * TODO: add testInStreamingMode() once streaming support will be implemented.
- *
- * <p>A test that verifies Beam metrics are reported to Spark's metrics sink 
in both batch and
- * streaming modes.
- */
-@Ignore("Has been failing since at least 
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+/** A test that verifies Beam metrics are reported to Spark's metrics sink in 
batch mode. */
 @RunWith(JUnit4.class)

Review Comment:
   I think it can be removed



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java:
##########
@@ -33,7 +41,35 @@ public static AggregatorMetric of(final NamedAggregators 
namedAggregators) {
     return new AggregatorMetric(namedAggregators);
   }
 
-  NamedAggregators getNamedAggregators() {
-    return namedAggregators;
+  @Override
+  public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter 
filter) {
+    Map<String, Gauge<Double>> metrics = new HashMap<>();
+    for (Map.Entry<String, ?> entry : namedAggregators.renderAll().entrySet()) 
{
+      String name = prefix + "." + entry.getKey();
+      Object rawValue = entry.getValue();
+      if (rawValue == null) {
+        continue;

Review Comment:
   plus same in the other aggregatormetric class



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -21,51 +21,42 @@
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
-import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import 
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExternalResource;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * TODO: add testInStreamingMode() once streaming support will be implemented.
- *
- * <p>A test that verifies Beam metrics are reported to Spark's metrics sink 
in both batch and
- * streaming modes.
- */
-@Ignore("Has been failing since at least 
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+/** A test that verifies Beam metrics are reported to Spark's metrics sink in 
batch mode. */
 @RunWith(JUnit4.class)
 public class SparkMetricsSinkTest {
+
+  @ClassRule
+  public static SparkSessionRule sessionRule =
+      new SparkSessionRule(
+          KV.of("spark.metrics.conf.*.sink.memory.class", 
InMemoryMetrics.class.getName()));
+
   @Rule public ExternalResource inMemoryMetricsSink = new 
InMemoryMetricsSinkRule();
 
+  @Rule
+  public TestPipeline pipeline = 
TestPipeline.fromOptions(sessionRule.createPipelineOptions());

Review Comment:
   final



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java:
##########
@@ -27,6 +27,7 @@
 
 /** Test SparkBeamMetric. */
 public class SparkBeamMetricTest {

Review Comment:
   Also I'm not sure specifying the JUnit 4 runner in BeamMetricTest is needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to