echauchot commented on code in PR #22157:
URL: https://github.com/apache/beam/pull/22157#discussion_r924263799
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java:
##########
@@ -33,7 +41,35 @@ public static AggregatorMetric of(final NamedAggregators
namedAggregators) {
return new AggregatorMetric(namedAggregators);
}
Review Comment:
remove final in the parameters as in the other aggregatormetric class.
I agreed with this suppression: final parameters are kind of a convention in
java. Otherwise we would have final parameters in all methods.
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/BeamMetricSet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import java.util.Map;
+
+/**
+ * Map of Beam metrics available from {@link Gauge#getValue()}.
+ *
+ * <p>Note: Recent versions of Dropwizard {@link
com.codahale.metrics.MetricRegistry MetricRegistry}
+ * do not allow registering arbitrary implementations of {@link
com.codahale.metrics.Metric
+ * Metrics}.
+ */
+public abstract class BeamMetricSet implements Gauge<Map<String,
Gauge<Double>>> {
Review Comment:
can be package local
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
/**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard
{@link Metric}
+ * interface.
*/
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
- static Map<String, ?> renderAll(MetricResults metricResults) {
- Map<String, Object> metrics = new HashMap<>();
- MetricQueryResults metricQueryResults = metricResults.allMetrics();
- for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
- metrics.put(renderName(metricResult), metricResult.getAttempted());
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
+ MetricQueryResults allMetrics = metricResults.allMetrics();
+ for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+ putFiltered(metrics, filter, renderName(prefix, metricResult),
metricResult.getAttempted());
}
- for (MetricResult<DistributionResult> metricResult :
metricQueryResults.getDistributions()) {
+ for (MetricResult<DistributionResult> metricResult :
allMetrics.getDistributions()) {
DistributionResult result = metricResult.getAttempted();
- metrics.put(renderName(metricResult) + ".count", result.getCount());
- metrics.put(renderName(metricResult) + ".sum", result.getSum());
- metrics.put(renderName(metricResult) + ".min", result.getMin());
- metrics.put(renderName(metricResult) + ".max", result.getMax());
- metrics.put(renderName(metricResult) + ".mean", result.getMean());
+ String baseName = renderName(prefix, metricResult);
+ putFiltered(metrics, filter, baseName + ".count", result.getCount());
+ putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+ putFiltered(metrics, filter, baseName + ".min", result.getMin());
+ putFiltered(metrics, filter, baseName + ".max", result.getMax());
+ putFiltered(metrics, filter, baseName + ".mean", result.getMean());
}
- for (MetricResult<GaugeResult> metricResult :
metricQueryResults.getGauges()) {
- metrics.put(renderName(metricResult),
metricResult.getAttempted().getValue());
+ for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+ putFiltered(
+ metrics,
+ filter,
+ renderName(prefix, metricResult),
+ metricResult.getAttempted().getValue());
}
return metrics;
}
- Map<String, ?> renderAll() {
- MetricResults metricResults =
- asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
- return renderAll(metricResults);
- }
-
@VisibleForTesting
- static String renderName(MetricResult<?> metricResult) {
+ @SuppressWarnings("nullness") // ok to have nullable elements on stream
+ static String renderName(String prefix, MetricResult<?> metricResult) {
MetricKey key = metricResult.getKey();
MetricName name = key.metricName();
- String step = key.stepName();
-
- ArrayList<String> pieces = new ArrayList<>();
+ return Streams.concat(
+ Stream.of(prefix), // prefix is not cleaned, should it be?
+ Stream.of(stripSuffix(cleanPart(key.stepName()))),
Review Comment:
keep the step variable because it has a meaning for beam (step in the
pipeline, ie transform) cf [metrics
pres](https://www.slideshare.net/slideshow/embed_code/key/kKJRzR8HxkxLsR). cf
s17 and s26
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
/**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard
{@link Metric}
+ * interface.
*/
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
- static Map<String, ?> renderAll(MetricResults metricResults) {
- Map<String, Object> metrics = new HashMap<>();
- MetricQueryResults metricQueryResults = metricResults.allMetrics();
- for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
- metrics.put(renderName(metricResult), metricResult.getAttempted());
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
+ MetricQueryResults allMetrics = metricResults.allMetrics();
+ for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+ putFiltered(metrics, filter, renderName(prefix, metricResult),
metricResult.getAttempted());
}
- for (MetricResult<DistributionResult> metricResult :
metricQueryResults.getDistributions()) {
+ for (MetricResult<DistributionResult> metricResult :
allMetrics.getDistributions()) {
DistributionResult result = metricResult.getAttempted();
- metrics.put(renderName(metricResult) + ".count", result.getCount());
- metrics.put(renderName(metricResult) + ".sum", result.getSum());
- metrics.put(renderName(metricResult) + ".min", result.getMin());
- metrics.put(renderName(metricResult) + ".max", result.getMax());
- metrics.put(renderName(metricResult) + ".mean", result.getMean());
+ String baseName = renderName(prefix, metricResult);
+ putFiltered(metrics, filter, baseName + ".count", result.getCount());
+ putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+ putFiltered(metrics, filter, baseName + ".min", result.getMin());
+ putFiltered(metrics, filter, baseName + ".max", result.getMax());
+ putFiltered(metrics, filter, baseName + ".mean", result.getMean());
}
- for (MetricResult<GaugeResult> metricResult :
metricQueryResults.getGauges()) {
- metrics.put(renderName(metricResult),
metricResult.getAttempted().getValue());
+ for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+ putFiltered(
+ metrics,
+ filter,
+ renderName(prefix, metricResult),
+ metricResult.getAttempted().getValue());
}
return metrics;
}
- Map<String, ?> renderAll() {
- MetricResults metricResults =
- asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
- return renderAll(metricResults);
- }
-
@VisibleForTesting
- static String renderName(MetricResult<?> metricResult) {
+ @SuppressWarnings("nullness") // ok to have nullable elements on stream
+ static String renderName(String prefix, MetricResult<?> metricResult) {
MetricKey key = metricResult.getKey();
MetricName name = key.metricName();
- String step = key.stepName();
-
- ArrayList<String> pieces = new ArrayList<>();
+ return Streams.concat(
+ Stream.of(prefix), // prefix is not cleaned, should it be?
+ Stream.of(stripSuffix(cleanPart(key.stepName()))),
+ Stream.of(name.getNamespace(),
name.getName()).map(SparkBeamMetric::cleanPart))
+ .filter(not(Strings::isNullOrEmpty))
+ .collect(Collectors.joining("."));
Review Comment:
clearer than previous code, thanks !
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java:
##########
@@ -39,26 +42,24 @@ public InMemoryMetrics(
internalMetricRegistry = metricRegistry;
}
- @SuppressWarnings("TypeParameterUnusedInFormals")
- public static <T> T valueOf(final String name) {
- final T retVal;
+ // Constructor for Spark >= 3.2
+ @SuppressWarnings("UnusedParameters")
+ public InMemoryMetrics(final Properties properties, final MetricRegistry
metricRegistry) {
+ extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
+ internalMetricRegistry = metricRegistry;
+ }
+ @SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"})
+ public static <T> T valueOf(final String name) {
// this might fail in case we have multiple aggregators with the same
suffix after
// the last dot, but it should be good enough for tests.
- if (extendedMetricsRegistry != null
- && extendedMetricsRegistry.getGauges().keySet().stream()
- .anyMatch(Predicates.containsPattern(name + "$")::apply)) {
- String key =
- extendedMetricsRegistry.getGauges().keySet().stream()
- .filter(Predicates.containsPattern(name + "$")::apply)
- .findFirst()
- .get();
- retVal = (T) extendedMetricsRegistry.getGauges().get(key).getValue();
+ if (extendedMetricsRegistry != null) {
+ Collection<Gauge> matches =
+ extendedMetricsRegistry.getGauges((n, m) ->
n.endsWith(name)).values();
Review Comment:
simpler code thanks
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -76,9 +67,10 @@ public void testInBatchMode() throws Exception {
.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()))
.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNTS);
- pipeline.run();
+ pipeline.run().waitUntilFinish();
Review Comment:
IIRC in test mode the pipeline default used to be waitUntilFish so that we
have a blocking call to assert afterwards. Is it not still the case ?
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java:
##########
@@ -27,6 +27,7 @@
/** Test SparkBeamMetric. */
public class SparkBeamMetricTest {
Review Comment:
why is there no SparkBeamMetricTest for the SS runner ? Ah I get it it is
the BeamMetricTest class. Can you rename BeamMetricTest to SparkBeamMetricTest
as the class it tests is SparkBeamMetric ?
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/BeamMetricSet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import java.util.Map;
+
+/**
+ * Map of Beam metrics available from {@link Gauge#getValue()}.
+ *
+ * <p>Note: Recent versions of Dropwizard {@link
com.codahale.metrics.MetricRegistry MetricRegistry}
+ * do not allow registering arbitrary implementations of {@link
com.codahale.metrics.Metric
+ * Metrics}.
Review Comment:
```suggestion
* Recent versions of Dropwizard {@link com.codahale.metrics.MetricRegistry
MetricRegistry}
* do not allow registering arbitrary implementations of {@link
com.codahale.metrics.Metric
* Metrics}. So this class is a Beam metrics registry implemented as a
Dropwizard {@link Gauge} for use by spark engine.
```
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
/**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard
{@link Metric}
+ * interface.
*/
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
- static Map<String, ?> renderAll(MetricResults metricResults) {
- Map<String, Object> metrics = new HashMap<>();
- MetricQueryResults metricQueryResults = metricResults.allMetrics();
- for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
- metrics.put(renderName(metricResult), metricResult.getAttempted());
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
Review Comment:
So now, all the beam metrics are stored in dropwizard `Gauge<Double>` ?
You're sure that all the types of Beam metrics `Counter, Gauge, Histogram and
Distribution` are compatible with DropWizard `Gauge<Double>`. I guess as
Distribution content is extracted into 5 entries as before it works. We will
see in the tests anyway.
But still no support for histograms (though it was not the aim of the PR to
add this feature)
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
/**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard
{@link Metric}
+ * interface.
*/
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
- static Map<String, ?> renderAll(MetricResults metricResults) {
- Map<String, Object> metrics = new HashMap<>();
- MetricQueryResults metricQueryResults = metricResults.allMetrics();
- for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
- metrics.put(renderName(metricResult), metricResult.getAttempted());
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
+ MetricQueryResults allMetrics = metricResults.allMetrics();
+ for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+ putFiltered(metrics, filter, renderName(prefix, metricResult),
metricResult.getAttempted());
}
- for (MetricResult<DistributionResult> metricResult :
metricQueryResults.getDistributions()) {
+ for (MetricResult<DistributionResult> metricResult :
allMetrics.getDistributions()) {
DistributionResult result = metricResult.getAttempted();
- metrics.put(renderName(metricResult) + ".count", result.getCount());
- metrics.put(renderName(metricResult) + ".sum", result.getSum());
- metrics.put(renderName(metricResult) + ".min", result.getMin());
- metrics.put(renderName(metricResult) + ".max", result.getMax());
- metrics.put(renderName(metricResult) + ".mean", result.getMean());
+ String baseName = renderName(prefix, metricResult);
+ putFiltered(metrics, filter, baseName + ".count", result.getCount());
+ putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+ putFiltered(metrics, filter, baseName + ".min", result.getMin());
+ putFiltered(metrics, filter, baseName + ".max", result.getMax());
+ putFiltered(metrics, filter, baseName + ".mean", result.getMean());
}
- for (MetricResult<GaugeResult> metricResult :
metricQueryResults.getGauges()) {
- metrics.put(renderName(metricResult),
metricResult.getAttempted().getValue());
+ for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+ putFiltered(
+ metrics,
+ filter,
+ renderName(prefix, metricResult),
+ metricResult.getAttempted().getValue());
}
return metrics;
}
- Map<String, ?> renderAll() {
- MetricResults metricResults =
- asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
- return renderAll(metricResults);
- }
-
@VisibleForTesting
- static String renderName(MetricResult<?> metricResult) {
+ @SuppressWarnings("nullness") // ok to have nullable elements on stream
+ static String renderName(String prefix, MetricResult<?> metricResult) {
MetricKey key = metricResult.getKey();
MetricName name = key.metricName();
- String step = key.stepName();
-
- ArrayList<String> pieces = new ArrayList<>();
+ return Streams.concat(
+ Stream.of(prefix), // prefix is not cleaned, should it be?
Review Comment:
breaking change ? Users will have their spark metrics prefixed now or is
this abstracted by new spark engine ?
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/BeamMetricSet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import java.util.Map;
+
+/**
+ * Map of Beam metrics available from {@link Gauge#getValue()}.
+ *
+ * <p>Note: Recent versions of Dropwizard {@link
com.codahale.metrics.MetricRegistry MetricRegistry}
+ * do not allow registering arbitrary implementations of {@link
com.codahale.metrics.Metric
+ * Metrics}.
+ */
+public abstract class BeamMetricSet implements Gauge<Map<String,
Gauge<Double>>> {
+
+ @Override
+ public final Map<String, Gauge<Double>> getValue() {
+ return getValue("", MetricFilter.ALL);
+ }
+
+ protected abstract Map<String, Gauge<Double>> getValue(String prefix,
MetricFilter filter);
+
+ protected Gauge<Double> staticGauge(Number number) {
+ return new StaticGauge(number.doubleValue());
+ }
+
+ private static class StaticGauge implements Gauge<Double> {
Review Comment:
we always make nested class static when possible to avoid serialization
issues. So this name is not very relevant consider renaming to `DoubleGauge`
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/BeamMetricSet.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricFilter;
+import java.util.Map;
+
+/**
+ * Map of Beam metrics available from {@link Gauge#getValue()}.
+ *
+ * <p>Note: Recent versions of Dropwizard {@link
com.codahale.metrics.MetricRegistry MetricRegistry}
+ * do not allow registering arbitrary implementations of {@link
com.codahale.metrics.Metric
+ * Metrics}.
+ */
+public abstract class BeamMetricSet implements Gauge<Map<String,
Gauge<Double>>> {
Review Comment:
can be package local
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
/**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard
{@link Metric}
+ * interface.
*/
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
- static Map<String, ?> renderAll(MetricResults metricResults) {
- Map<String, Object> metrics = new HashMap<>();
- MetricQueryResults metricQueryResults = metricResults.allMetrics();
- for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
- metrics.put(renderName(metricResult), metricResult.getAttempted());
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
+ MetricQueryResults allMetrics = metricResults.allMetrics();
+ for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+ putFiltered(metrics, filter, renderName(prefix, metricResult),
metricResult.getAttempted());
}
- for (MetricResult<DistributionResult> metricResult :
metricQueryResults.getDistributions()) {
+ for (MetricResult<DistributionResult> metricResult :
allMetrics.getDistributions()) {
DistributionResult result = metricResult.getAttempted();
- metrics.put(renderName(metricResult) + ".count", result.getCount());
- metrics.put(renderName(metricResult) + ".sum", result.getSum());
- metrics.put(renderName(metricResult) + ".min", result.getMin());
- metrics.put(renderName(metricResult) + ".max", result.getMax());
- metrics.put(renderName(metricResult) + ".mean", result.getMean());
+ String baseName = renderName(prefix, metricResult);
+ putFiltered(metrics, filter, baseName + ".count", result.getCount());
+ putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+ putFiltered(metrics, filter, baseName + ".min", result.getMin());
+ putFiltered(metrics, filter, baseName + ".max", result.getMax());
+ putFiltered(metrics, filter, baseName + ".mean", result.getMean());
}
- for (MetricResult<GaugeResult> metricResult :
metricQueryResults.getGauges()) {
- metrics.put(renderName(metricResult),
metricResult.getAttempted().getValue());
+ for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+ putFiltered(
+ metrics,
+ filter,
+ renderName(prefix, metricResult),
+ metricResult.getAttempted().getValue());
}
return metrics;
}
- Map<String, ?> renderAll() {
- MetricResults metricResults =
- asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
- return renderAll(metricResults);
- }
-
@VisibleForTesting
- static String renderName(MetricResult<?> metricResult) {
+ @SuppressWarnings("nullness") // ok to have nullable elements on stream
+ static String renderName(String prefix, MetricResult<?> metricResult) {
MetricKey key = metricResult.getKey();
MetricName name = key.metricName();
- String step = key.stepName();
-
- ArrayList<String> pieces = new ArrayList<>();
+ return Streams.concat(
+ Stream.of(prefix), // prefix is not cleaned, should it be?
+ Stream.of(stripSuffix(cleanPart(key.stepName()))),
+ Stream.of(name.getNamespace(),
name.getName()).map(SparkBeamMetric::cleanPart))
+ .filter(not(Strings::isNullOrEmpty))
+ .collect(Collectors.joining("."));
+ }
- if (step != null) {
- step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
- if (step.endsWith("_")) {
- step = step.substring(0, step.length() - 1);
- }
- pieces.add(step);
- }
+ private static @Nullable String cleanPart(@Nullable String str) {
Review Comment:
nit picking: I would rename to `normalizeString` or something similar
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java:
##########
@@ -33,7 +41,35 @@ public static AggregatorMetric of(final NamedAggregators
namedAggregators) {
return new AggregatorMetric(namedAggregators);
}
- NamedAggregators getNamedAggregators() {
- return namedAggregators;
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
+ for (Map.Entry<String, ?> entry : namedAggregators.renderAll().entrySet())
{
+ String name = prefix + "." + entry.getKey();
+ Object rawValue = entry.getValue();
+ if (rawValue == null) {
+ continue;
Review Comment:
I'm not a big fan of this coding style. I would prefer
```
if (rawValue != null) {
try {
...
}
}
```
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/sink/CodahaleGraphiteSink.java:
##########
@@ -21,14 +21,69 @@
import java.util.Properties;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetric;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
+import org.apache.spark.SecurityManager;
import org.apache.spark.metrics.sink.Sink;
-/** A Spark {@link Sink} that is tailored to report {@link AggregatorMetric}
metrics to Graphite. */
-public class CodahaleGraphiteSink extends
org.apache.spark.metrics.sink.GraphiteSink {
+/**
+ * A {@link Sink} for <a
href="https://spark.apache.org/docs/latest/monitoring.html#metrics">Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to
Graphite.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ *
"spark.metrics.conf.*.sink.graphite.class"="org.apache.beam.runners.spark.structuredstreaming.metrics.sink.CodahaleGraphiteSink"
+ * "spark.metrics.conf.*.sink.graphite.host"="<graphite_hostname>"
+ * "spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
+ * "spark.metrics.conf.*.sink.graphite.period"=10
+ * "spark.metrics.conf.*.sink.graphite.unit"=seconds
+ * "spark.metrics.conf.*.sink.graphite.prefix"="<optional_prefix>"
+ *
"spark.metrics.conf.*.sink.graphite.regex"="<optional_regex_to_send_matching_metrics>"
+ * }</pre>
+ */
+public class CodahaleGraphiteSink implements Sink {
Review Comment:
Why not call it GraphiteSink like in the non-structuredstreaming runner ?
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java:
##########
@@ -33,61 +37,71 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
/**
- * An adapter between the {@link MetricsContainerStepMap} and Codahale's
{@link Metric} interface.
+ * An adapter between the {@link MetricsContainerStepMap} and the Dropwizard
{@link Metric}
+ * interface.
*/
-public class SparkBeamMetric implements Metric {
+public class SparkBeamMetric extends BeamMetricSet {
+
private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
- static Map<String, ?> renderAll(MetricResults metricResults) {
- Map<String, Object> metrics = new HashMap<>();
- MetricQueryResults metricQueryResults = metricResults.allMetrics();
- for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
- metrics.put(renderName(metricResult), metricResult.getAttempted());
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
+ MetricQueryResults allMetrics = metricResults.allMetrics();
+ for (MetricResult<Long> metricResult : allMetrics.getCounters()) {
+ putFiltered(metrics, filter, renderName(prefix, metricResult),
metricResult.getAttempted());
}
- for (MetricResult<DistributionResult> metricResult :
metricQueryResults.getDistributions()) {
+ for (MetricResult<DistributionResult> metricResult :
allMetrics.getDistributions()) {
DistributionResult result = metricResult.getAttempted();
- metrics.put(renderName(metricResult) + ".count", result.getCount());
- metrics.put(renderName(metricResult) + ".sum", result.getSum());
- metrics.put(renderName(metricResult) + ".min", result.getMin());
- metrics.put(renderName(metricResult) + ".max", result.getMax());
- metrics.put(renderName(metricResult) + ".mean", result.getMean());
+ String baseName = renderName(prefix, metricResult);
+ putFiltered(metrics, filter, baseName + ".count", result.getCount());
+ putFiltered(metrics, filter, baseName + ".sum", result.getSum());
+ putFiltered(metrics, filter, baseName + ".min", result.getMin());
+ putFiltered(metrics, filter, baseName + ".max", result.getMax());
+ putFiltered(metrics, filter, baseName + ".mean", result.getMean());
}
- for (MetricResult<GaugeResult> metricResult :
metricQueryResults.getGauges()) {
- metrics.put(renderName(metricResult),
metricResult.getAttempted().getValue());
+ for (MetricResult<GaugeResult> metricResult : allMetrics.getGauges()) {
+ putFiltered(
+ metrics,
+ filter,
+ renderName(prefix, metricResult),
+ metricResult.getAttempted().getValue());
}
return metrics;
}
- Map<String, ?> renderAll() {
- MetricResults metricResults =
- asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
- return renderAll(metricResults);
- }
-
@VisibleForTesting
- static String renderName(MetricResult<?> metricResult) {
+ @SuppressWarnings("nullness") // ok to have nullable elements on stream
+ static String renderName(String prefix, MetricResult<?> metricResult) {
MetricKey key = metricResult.getKey();
MetricName name = key.metricName();
- String step = key.stepName();
-
- ArrayList<String> pieces = new ArrayList<>();
+ return Streams.concat(
+ Stream.of(prefix), // prefix is not cleaned, should it be?
+ Stream.of(stripSuffix(cleanPart(key.stepName()))),
+ Stream.of(name.getNamespace(),
name.getName()).map(SparkBeamMetric::cleanPart))
+ .filter(not(Strings::isNullOrEmpty))
+ .collect(Collectors.joining("."));
+ }
- if (step != null) {
- step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
- if (step.endsWith("_")) {
- step = step.substring(0, step.length() - 1);
- }
- pieces.add(step);
- }
+ private static @Nullable String cleanPart(@Nullable String str) {
+ return str != null ? str.replaceAll(ILLEGAL_CHARACTERS, "_") : null;
+ }
- pieces.addAll(
- ImmutableList.of(name.getNamespace(), name.getName()).stream()
- .map(str -> str.replaceAll(ILLEGAL_CHARACTERS, "_"))
- .collect(toList()));
+ private static @Nullable String stripSuffix(@Nullable String str) {
+ return str != null && str.endsWith("_") ? str.substring(0, str.length() -
1) : str;
+ }
- return String.join(".", pieces);
+ private void putFiltered(
Review Comment:
previously the metrics were not filtered, how is it working with the
dropWizard filters ?
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java:
##########
@@ -33,7 +41,35 @@ public static AggregatorMetric of(final NamedAggregators
namedAggregators) {
return new AggregatorMetric(namedAggregators);
}
- NamedAggregators getNamedAggregators() {
- return namedAggregators;
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
+ for (Map.Entry<String, ?> entry : namedAggregators.renderAll().entrySet())
{
+ String name = prefix + "." + entry.getKey();
+ Object rawValue = entry.getValue();
+ if (rawValue == null) {
+ continue;
+ }
+ try {
+ Gauge<Double> gauge = staticGauge(rawValue);
+ if (filter.matches(name, gauge)) {
+ metrics.put(name, gauge);
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn(
+ "Metric `{}` of type {} can't be reported, conversion to double
failed.",
+ name,
+ rawValue.getClass().getSimpleName(),
+ e);
+ }
+ }
+ return metrics;
+ }
+
+ // Metric type is assumed to be compatible with Double
Review Comment:
let me get this clear: with new DropWizard version you can no more create a
custom metrics registry so you use a DropWizard Gauge to make a registry
(`Gauge<Map<String, Gauge<Double>>>`). Every type of Beam metrics can be
stored in DropWizard `Gauge<Double>`.
See also similar comment in the other subclass of BeamMetricSet
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java:
##########
@@ -88,95 +71,21 @@ public SortedMap<String, Counter> getCounters(final
MetricFilter filter) {
@Override
public SortedMap<String, Gauge> getGauges(final MetricFilter filter) {
- return new ImmutableSortedMap.Builder<String, Gauge>(
- Ordering.from(String.CASE_INSENSITIVE_ORDER))
- .putAll(internalMetricRegistry.getGauges(filter))
- .putAll(extractGauges(internalMetricRegistry, filter))
- .build();
- }
-
- private Map<String, Gauge> extractGauges(
- final MetricRegistry metricRegistry, final MetricFilter filter) {
- Map<String, Gauge> gauges = new HashMap<>();
-
- // find the AggregatorMetric metrics from within all currently registered
metrics
- final Optional<Map<String, Gauge>> aggregatorMetrics =
- FluentIterable.from(metricRegistry.getMetrics().entrySet())
- .firstMatch(isAggregatorMetric())
- .transform(aggregatorMetricToGauges());
-
- // find the SparkBeamMetric metrics from within all currently registered
metrics
- final Optional<Map<String, Gauge>> beamMetrics =
- FluentIterable.from(metricRegistry.getMetrics().entrySet())
- .firstMatch(isSparkBeamMetric())
- .transform(beamMetricToGauges());
-
- if (aggregatorMetrics.isPresent()) {
- gauges.putAll(Maps.filterEntries(aggregatorMetrics.get(),
matches(filter)));
- }
-
- if (beamMetrics.isPresent()) {
- gauges.putAll(Maps.filterEntries(beamMetrics.get(), matches(filter)));
- }
-
- return gauges;
- }
-
- private Function<Map.Entry<String, Metric>, Map<String, Gauge>>
aggregatorMetricToGauges() {
- return entry -> {
- final NamedAggregators agg = ((AggregatorMetric)
entry.getValue()).getNamedAggregators();
- final String parentName = entry.getKey();
- final Map<String, Gauge> gaugeMap =
Maps.transformEntries(agg.renderAll(), toGauge());
- final Map<String, Gauge> fullNameGaugeMap = Maps.newLinkedHashMap();
- for (Map.Entry<String, Gauge> gaugeEntry : gaugeMap.entrySet()) {
- fullNameGaugeMap.put(parentName + "." + gaugeEntry.getKey(),
gaugeEntry.getValue());
+ ImmutableSortedMap.Builder<String, Gauge> builder =
+ new
ImmutableSortedMap.Builder<>(Ordering.from(String.CASE_INSENSITIVE_ORDER));
+
+ Map<String, Gauge> gauges =
+ internalMetricRegistry.getGauges(
+ (n, m) -> filter.matches(n, m) || m instanceof BeamMetricSet);
+
+ for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+ Gauge gauge = entry.getValue();
+ if (gauge instanceof BeamMetricSet) {
+ builder.putAll(((BeamMetricSet) gauge).getValue(entry.getKey(),
filter));
+ } else {
+ builder.put(entry.getKey(), gauge);
Review Comment:
Way more maintainable code, thanks !
If the tests cover enough and pass, we are all good
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -21,51 +21,42 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
-import
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * TODO: add testInStreamingMode() once streaming support will be implemented.
- *
- * <p>A test that verifies Beam metrics are reported to Spark's metrics sink
in both batch and
- * streaming modes.
- */
-@Ignore("Has been failing since at least
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+/** A test that verifies Beam metrics are reported to Spark's metrics sink in
batch mode. */
@RunWith(JUnit4.class)
public class SparkMetricsSinkTest {
+
+ @ClassRule
+ public static SparkSessionRule sessionRule =
+ new SparkSessionRule(
+ KV.of("spark.metrics.conf.*.sink.memory.class",
InMemoryMetrics.class.getName()));
Review Comment:
Better indeed to configure the context here rather than having the strange
conf file with a single entry as before. Thanks
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/sink/CodahaleCsvSink.java:
##########
@@ -21,16 +21,66 @@
import java.util.Properties;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetric;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
+import org.apache.spark.SecurityManager;
import org.apache.spark.metrics.sink.Sink;
/**
- * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric}
metrics to a CSV file.
+ * A {@link Sink} for <a
href="https://spark.apache.org/docs/latest/monitoring.html#metrics">Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to a
CSV file.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ *
"spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.structuredstreaming.metrics.sink.CodahaleCsvSink"
+ * "spark.metrics.conf.*.sink.csv.directory"="<output_directory>"
+ * "spark.metrics.conf.*.sink.csv.period"=10
+ * "spark.metrics.conf.*.sink.csv.unit"=seconds
+ * }</pre>
*/
-public class CodahaleCsvSink extends org.apache.spark.metrics.sink.CsvSink {
+public class CodahaleCsvSink implements Sink {
Review Comment:
Why not call it CsvSink like in the non-structuredstreaming runner ?
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -21,51 +21,42 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
-import
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * TODO: add testInStreamingMode() once streaming support will be implemented.
- *
- * <p>A test that verifies Beam metrics are reported to Spark's metrics sink
in both batch and
- * streaming modes.
- */
-@Ignore("Has been failing since at least
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+/** A test that verifies Beam metrics are reported to Spark's metrics sink in
batch mode. */
@RunWith(JUnit4.class)
public class SparkMetricsSinkTest {
+
+ @ClassRule
+ public static SparkSessionRule sessionRule =
+ new SparkSessionRule(
+ KV.of("spark.metrics.conf.*.sink.memory.class",
InMemoryMetrics.class.getName()));
+
@Rule public ExternalResource inMemoryMetricsSink = new
InMemoryMetricsSinkRule();
+ @Rule
+ public TestPipeline pipeline =
TestPipeline.fromOptions(sessionRule.createPipelineOptions());
Review Comment:
IIRC runner, test mode etc are configured in
`sessionRule#createPipelineOptions`
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -21,51 +21,42 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
-import
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * TODO: add testInStreamingMode() once streaming support will be implemented.
- *
- * <p>A test that verifies Beam metrics are reported to Spark's metrics sink
in both batch and
- * streaming modes.
- */
-@Ignore("Has been failing since at least
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+/** A test that verifies Beam metrics are reported to Spark's metrics sink in
batch mode. */
@RunWith(JUnit4.class)
Review Comment:
I think it can be removed
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java:
##########
@@ -33,7 +41,35 @@ public static AggregatorMetric of(final NamedAggregators
namedAggregators) {
return new AggregatorMetric(namedAggregators);
}
- NamedAggregators getNamedAggregators() {
- return namedAggregators;
+ @Override
+ public Map<String, Gauge<Double>> getValue(String prefix, MetricFilter
filter) {
+ Map<String, Gauge<Double>> metrics = new HashMap<>();
+ for (Map.Entry<String, ?> entry : namedAggregators.renderAll().entrySet())
{
+ String name = prefix + "." + entry.getKey();
+ Object rawValue = entry.getValue();
+ if (rawValue == null) {
+ continue;
Review Comment:
plus same in the other aggregatormetric class
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -21,51 +21,42 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
-import
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import
org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * TODO: add testInStreamingMode() once streaming support will be implemented.
- *
- * <p>A test that verifies Beam metrics are reported to Spark's metrics sink
in both batch and
- * streaming modes.
- */
-@Ignore("Has been failing since at least
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+/** A test that verifies Beam metrics are reported to Spark's metrics sink in
batch mode. */
@RunWith(JUnit4.class)
public class SparkMetricsSinkTest {
+
+ @ClassRule
+ public static SparkSessionRule sessionRule =
+ new SparkSessionRule(
+ KV.of("spark.metrics.conf.*.sink.memory.class",
InMemoryMetrics.class.getName()));
+
@Rule public ExternalResource inMemoryMetricsSink = new
InMemoryMetricsSinkRule();
+ @Rule
+ public TestPipeline pipeline =
TestPipeline.fromOptions(sessionRule.createPipelineOptions());
Review Comment:
final
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java:
##########
@@ -27,6 +27,7 @@
/** Test SparkBeamMetric. */
public class SparkBeamMetricTest {
Review Comment:
Also I'm not sure specifying the JUnit 4 runner in BeamMetricTest is needed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]