Hi all, I am trying to write a custom Source for counting errors and output that with Spark sink mechanism ( CSV or JMX ) and having some problems understanding how this works.
1. I defined the Source, added counters created with MetricRegistry and registered the Source > SparkEnv.get().metricsSystem().registerSource(this) 2. Used that counter ( I could printout in driver the value ) 3. With CsvSink my counter is reported but value is 0. !! I have following questions: - I expect that codehale's Counter is serialised and registered but because objects are different is not the right counter. I have a version with accumulator and is working fine just little worried about performance. ( and design ) Is there another way of doing this ? maybe static fields ? - When running on YARN how many sink objects will be created ? - If I will create some singleton object and register that counter in Spark, counting is right but will never report from executor. How to enable reporting from executors when running on YARN ? My custom Source: public class CustomMonitoring implements Source { > private MetricRegistry metricRegistry = new MetricRegistry(); > public CustomMonitoring(List<String> counts) { > for (String count : counts) { > metricRegistry.counter(count); > } > SparkEnv.get().metricsSystem().registerSource(this); > } > @Override > public String sourceName() { > return TURBINE_CUSTOM_MONITORING; > } > public MetricRegistry metricRegistry() { > return metricRegistry; > } > } metrics.properties > *.sink.csv.class=org.apache.spark.metrics.sink.CsvSink > *.sink.csv.directory=/tmp/csvSink/ > *.sink.csv.period=60 > *.sink.csv.unit=seconds Thanks you, Nicolae R.