mosche commented on code in PR #22157:
URL: https://github.com/apache/beam/pull/22157#discussion_r923138902


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java:
##########
@@ -18,22 +18,64 @@
 package org.apache.beam.runners.spark.metrics.sink;
 
 import com.codahale.metrics.MetricRegistry;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.lang.reflect.Constructor;
 import java.util.Properties;
 import org.apache.beam.runners.spark.metrics.AggregatorMetric;
 import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
 import org.apache.spark.metrics.sink.Sink;
 
 /**
- * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics to a CSV file.
+ * A {@link Sink} for <a 
href="https://spark.apache.org/docs/latest/monitoring.html#metrics";>Spark's
+ * metric system</a> that is tailored to report {@link AggregatorMetric}s to a 
CSV file.
+ *
+ * <p>The sink is configured using Spark configuration parameters, for example:
+ *
+ * <pre>{@code
+ * 
"spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"
+ * "spark.metrics.conf.*.sink.csv.directory"="<output_directory>"
+ * "spark.metrics.conf.*.sink.csv.period"=10
+ * "spark.metrics.conf.*.sink.csv.unit"=seconds
+ * }</pre>
  */
-// Intentionally overriding parent name because inheritors should replace the 
parent.
-@SuppressFBWarnings("NM_SAME_SIMPLE_NAME_AS_SUPERCLASS")
-public class CsvSink extends org.apache.spark.metrics.sink.CsvSink {
+public class CsvSink implements Sink {
+
+  // Initialized reflectively as done by Spark's MetricsSystem
+  private final org.apache.spark.metrics.sink.CsvSink delegate;
+
+  /** Constructor for Spark 3.1.x. */
   public CsvSink(
       final Properties properties,
       final MetricRegistry metricRegistry,
       final org.apache.spark.SecurityManager securityMgr) {
-    super(properties, WithMetricsSupport.forRegistry(metricRegistry), 
securityMgr);
+    delegate = newDelegate(properties, 
WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
+  }
+
+  /** Constructor for Spark 3.2.x and later. */
+  public CsvSink(final Properties properties, final MetricRegistry 
metricRegistry) {
+    delegate = newDelegate(properties, 
WithMetricsSupport.forRegistry(metricRegistry));
+  }
+
+  @Override
+  public void start() {
+    delegate.start();
+  }
+
+  @Override
+  public void stop() {
+    delegate.stop();
+  }
+
+  @Override
+  public void report() {
+    delegate.report();
+  }
+
+  private static org.apache.spark.metrics.sink.CsvSink newDelegate(Object... 
params) {
+    try {
+      Constructor<?> constructor = 
org.apache.spark.metrics.sink.CsvSink.class.getConstructors()[0];

Review Comment:
   👍 I can change that if you prefer, but it doesn't provide any additional 
safety / checks. That's the version i used initially. These constructors are 
only ever called by Spark's `MetricsSystem` (using reflection btw), so having 
anything else other than the default constructor would be just pointless / dead 
code.
   Also, in both cases `newInstance` uses varargs and it will always fail at 
runtime without any compilation error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to