echauchot commented on code in PR #22157:
URL: https://github.com/apache/beam/pull/22157#discussion_r925749708
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java:
##########
@@ -18,22 +18,64 @@
package org.apache.beam.runners.spark.metrics.sink;
import com.codahale.metrics.MetricRegistry;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.lang.reflect.Constructor;
import java.util.Properties;
import org.apache.beam.runners.spark.metrics.AggregatorMetric;
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
import org.apache.spark.metrics.sink.Sink;
/**
- * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric}
metrics to a CSV file.
+ * A {@link Sink} for <a
href="https://spark.apache.org/docs/latest/monitoring.html#metrics">Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to a
CSV file.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ *
"spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"
+ * "spark.metrics.conf.*.sink.csv.directory"="<output_directory>"
+ * "spark.metrics.conf.*.sink.csv.period"=10
+ * "spark.metrics.conf.*.sink.csv.unit"=seconds
+ * }</pre>
*/
-// Intentionally overriding parent name because inheritors should replace the
parent.
-@SuppressFBWarnings("NM_SAME_SIMPLE_NAME_AS_SUPERCLASS")
-public class CsvSink extends org.apache.spark.metrics.sink.CsvSink {
+public class CsvSink implements Sink {
+
+ // Initialized reflectively as done by Spark's MetricsSystem
+ private final org.apache.spark.metrics.sink.CsvSink delegate;
+
+ /** Constructor for Spark 3.1.x. */
public CsvSink(
final Properties properties,
final MetricRegistry metricRegistry,
final org.apache.spark.SecurityManager securityMgr) {
- super(properties, WithMetricsSupport.forRegistry(metricRegistry),
securityMgr);
+ delegate = newDelegate(properties,
WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+ }
+
+ /** Constructor for Spark 3.2.x and later. */
+ public CsvSink(final Properties properties, final MetricRegistry
metricRegistry) {
+ delegate = newDelegate(properties,
WithMetricsSupport.forRegistry(metricRegistry));
+ }
+
+ @Override
+ public void start() {
+ delegate.start();
+ }
+
+ @Override
+ public void stop() {
+ delegate.stop();
+ }
+
+ @Override
+ public void report() {
+ delegate.report();
+ }
+
+ private static org.apache.spark.metrics.sink.CsvSink newDelegate(Object...
params) {
+ try {
+ Constructor<?> constructor =
org.apache.spark.metrics.sink.CsvSink.class.getConstructors()[0];
Review Comment:
:+1: but it is still more readable I believe
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]