Hi all,

I am trying to write a custom Source for counting errors and output that
with Spark sink mechanism ( CSV or JMX ) and having some problems
understanding how this works.

1. I defined the Source, added counters created with MetricRegistry and
registered the Source

> SparkEnv.get().metricsSystem().registerSource(this)


2. Used that counter ( I could printout in driver the value )

3. With CsvSink my counter is reported but value is 0. !!

I have following questions:
 - I expect that codehale's Counter is serialised and registered but
because objects are different is not the right counter. I have a version
with accumulator and is working fine just little worried about performance.
( and design ) Is there another way of doing this ? maybe static fields ?

- When running on YARN how many sink objects will be created ?

- If I will create some singleton object and register that counter in
Spark, counting is right but will never report from executor. How to enable
reporting from executors when running on YARN ?

My custom Source:

public class CustomMonitoring implements Source {
> private MetricRegistry metricRegistry = new MetricRegistry();
> public CustomMonitoring(List<String> counts) {
>     for (String count : counts) {
>         metricRegistry.counter(count);
>     }
>     SparkEnv.get().metricsSystem().registerSource(this);
> }
> @Override
> public String sourceName() {
>     return TURBINE_CUSTOM_MONITORING;
> }
> public MetricRegistry metricRegistry() {
>     return metricRegistry;
> }
> }


metrics.properties

> *.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
> *.sink.csv.directory=/tmp/csvSink/
> *.sink.csv.period=60
> *.sink.csv.unit=seconds

Thanks you,

Nicolae  R.

Reply via email to