Re: Dynamic metric names

2019-05-06 Thread Sergey Zhemzhitsky
Hi Saisai,

Thanks a lot for the link! This is exactly what I need.
Just curious, why this PR has not been merged, as it seems to implement
rather natural requirement.

There are a number or use cases which can benefit from this feature, e.g.
- collecting business metrics based on the data's attributes and reporting
them into the monitoring system as a side effect of the data processing
- visualizing technical metrics by means of alternative software (e.g.
grafana) - currently it's hardly possible to know the actual number of
jobs, stages, tasks and their names and IDs in advance to register all the
corresponding metrics statically.


Kind Regards,
Sergey


On Mon, May 6, 2019, 16:07 Saisai Shao  wrote:

> I remembered there was a PR about doing similar thing (
> https://github.com/apache/spark/pull/18406). From my understanding, this
> seems like a quite specific requirement, it may requires code change to
> support your needs.
>
> Thanks
> Saisai
>
> Sergey Zhemzhitsky  于2019年5月4日周六 下午4:44写道:
>
>> Hello Spark Users!
>>
>> Just wondering whether it is possible to register a metric source without
>> metrics known in advance and add the metrics themselves to this source
>> later on?
>>
>> It seems that currently MetricSystem puts all the metrics from the
>> source's MetricRegistry into a shared MetricRegistry of a MetricSystem
>> during metric source registration [1].
>>
>> So in case there is a new metric with a new name added to the source's
>> registry after this source registration, then this new metric will not be
>> reported to the sinks.
>>
>> What I'd like to achieve is to be able to register new metrics with new
>> names dynamically using a single metric source.
>> Is it somehow possible?
>>
>>
>> [1]
>> https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162
>>
>


Dynamic metric names

2019-05-04 Thread Sergey Zhemzhitsky
Hello Spark Users!

Just wondering whether it is possible to register a metric source without
metrics known in advance and add the metrics themselves to this source
later on?

It seems that currently MetricSystem puts all the metrics from the source's
MetricRegistry into a shared MetricRegistry of a MetricSystem during metric
source registration [1].

So in case there is a new metric with a new name added to the source's
registry after this source registration, then this new metric will not be
reported to the sinks.

What I'd like to achieve is to be able to register new metrics with new
names dynamically using a single metric source.
Is it somehow possible?


[1]
https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162


Re: Accumulator guarantees

2018-05-10 Thread Sergey Zhemzhitsky
As far as I understand updates of the custom accumulators at the
driver side happen during task completion [1].
The documentation states [2] that the very last stage in a job
consists of multiple ResultTasks, which execute the task and send its
output back to the driver application.
Also sources prove [3] that accumulators are updated for the
ResultTasks just once.

So it's seems that accumulators are safe to use and will be executed
only once regardless of where they are used (transformations as well
as actions) in case these transformations and actions are belong to
the last stage. Is it correct?
Could anyone of Spark commiters or contributors please confirm it?


[1] 
https://github.com/apache/spark/blob/3990daaf3b6ca2c5a9f7790030096262efb12cb2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1194
[2] 
https://github.com/apache/spark/blob/a6fc300e91273230e7134ac6db95ccb4436c6f8f/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L36
[3] 
https://github.com/apache/spark/blob/3990daaf3b6ca2c5a9f7790030096262efb12cb2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1204

On Thu, May 10, 2018 at 10:24 PM, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote:
> Hi there,
>
> Although Spark's docs state that there is a guarantee that
> - accumulators in actions will only be updated once
> - accumulators in transformations may be updated multiple times
>
> ... I'm wondering whether the same is true for transformations in the
> last stage of the job or there is a guarantee that accumulators
> running in transformations of the last stage are guaranteed to be
> updated once too?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Accumulator guarantees

2018-05-10 Thread Sergey Zhemzhitsky
Hi there,

Although Spark's docs state that there is a guarantee that
- accumulators in actions will only be updated once
- accumulators in transformations may be updated multiple times

... I'm wondering whether the same is true for transformations in the
last stage of the job or there is a guarantee that accumulators
running in transformations of the last stage are guaranteed to be
updated once too?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: AccumulatorV2 vs AccumulableParam (V1)

2018-05-04 Thread Sergey Zhemzhitsky
Hi Wenchen,

Thanks a lot for clarification and help.

Here is what I mean regarding the remaining points

For 2: Should we update the documentation [1] regarding custom
accumulators to be more clear and to highlight that
  a) custom accumulators should always override "copy" method to
prevent unexpected behaviour with losing type information
  b) custom accumulators cannot be direct anonymous subclasses of
AccumulatorV2 because of a)
  c) extending already existing accumulators almost always requires
overriding "copy" because of a)

For 3: Here is [2] the sample that shows that the same
AccumulableParam can be registered twice with different names.
Here is [3] the sample that fails with IllegalStateException on this
line [4] because accumulator's metadata is not null and it's hardly
possible to reset it to null (there is no public API for such a
thing).
I understand, that Spark creates different Accumulators for the same
AccumulableParam internally and because of AccumulatorV2 is stateful
using the same stateful accumulator instance in multiple places for
different things is very dangerous, so maybe we should highlight this
point in the documentation too?

For 5: Should we raise a JIRA for that?


[1] https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
[2] 
https://gist.github.com/szhem/52a26ada4bbeb1a3e762710adc3f94ef#file-accumulatorsspec-scala-L36
[3] 
https://gist.github.com/szhem/52a26ada4bbeb1a3e762710adc3f94ef#file-accumulatorsspec-scala-L59
[4] 
https://github.com/apache/spark/blob/4d5de4d303a773b1c18c350072344bd7efca9fc4/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L51


Kind Regards,
Sergey

On Thu, May 3, 2018 at 5:20 PM, Wenchen Fan <cloud0...@gmail.com> wrote:
> Hi Sergey,
>
> Thanks for your valuable feedback!
>
> For 1: yea this is definitely a bug and I have sent a PR to fix it.
> For 2: I have left my comments on the JIRA ticket.
> For 3: I don't quite understand it, can you give some concrete examples?
> For 4: yea this is a problem, but I think it's not a big deal, and we
> couldn't find a better solution at that time.
> For 5: I think this is a real problem. It looks to me that we can merge
> `isZero`, `copyAndReset`, `copy`, `reset` into one API: `zero`, which is
> basically just the `copyAndReset`. If there is a way to fix this without
> breaking the existing API, I'm really happy to do it.
> For 6: same as 4. It's a problem but not a big deal.
>
> In general, I think accumulator v2 sacrifices some flexibility to simplify
> the framework and improve the performance. Users can still use accumulator
> v1 if flexibility is more important to them. We can keep improving
> accumulator v2 without breaking backward compatibility.
>
> Thanks,
> Wenchen
>
> On Thu, May 3, 2018 at 6:20 AM, Sergey Zhemzhitsky <szh.s...@gmail.com>
> wrote:
>>
>> Hello guys,
>>
>> I've started to migrate my Spark jobs which use Accumulators V1 to
>> AccumulatorV2 and faced with the following issues:
>>
>> 1. LegacyAccumulatorWrapper now requires the resulting type of
>> AccumulableParam to implement equals. In other case the
>> AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
>> will fail with AssertionError (SPARK-23697 [1]).
>>
>> 2. Existing AccumulatorV2 classes are hardly difficult to extend
>> easily and correctly (SPARK-24154 [2]) due to its "copy" method which
>> is called during serialization and usually loses type information of
>> descendant classes which don't override "copy" (and it's easier to
>> implement an accumulator from scratch than override it correctly)
>>
>> 3. The same instance of AccumulatorV2 cannot be used with the same
>> SparkContext multiple times (unlike AccumulableParam) failing with
>> "IllegalStateException: Cannot register an Accumulator twice" even
>> after "reset" method called. So it's impossible to unregister already
>> registered accumulator from user code.
>>
>> 4. AccumulableParam (V1) implementations are usually more or less
>> stateless, while AccumulatorV2 implementations are almost always
>> stateful, leading to (unnecessary?) type checks (unlike
>> AccumulableParam). For example typical "merge" method of AccumulatorV2
>> requires to check whether current accumulator is of an appropriate
>> type, like here [3]
>>
>> 5. AccumulatorV2 is more difficult to implement correctly unlike
>> AccumulableParam. For example, in case of AccumulableParam I have to
>> implement just 3 methods (addAccumulator, addInPlace, zero), in case
>> of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
>> AccumulatorV2 - 6 methods (isZero, copy, res

AccumulatorV2 vs AccumulableParam (V1)

2018-05-02 Thread Sergey Zhemzhitsky
Hello guys,

I've started to migrate my Spark jobs which use Accumulators V1 to
AccumulatorV2 and faced with the following issues:

1. LegacyAccumulatorWrapper now requires the resulting type of
AccumulableParam to implement equals. In other case the
AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
will fail with AssertionError (SPARK-23697 [1]).

2. Existing AccumulatorV2 classes are hardly difficult to extend
easily and correctly (SPARK-24154 [2]) due to its "copy" method which
is called during serialization and usually loses type information of
descendant classes which don't override "copy" (and it's easier to
implement an accumulator from scratch than override it correctly)

3. The same instance of AccumulatorV2 cannot be used with the same
SparkContext multiple times (unlike AccumulableParam) failing with
"IllegalStateException: Cannot register an Accumulator twice" even
after "reset" method called. So it's impossible to unregister already
registered accumulator from user code.

4. AccumulableParam (V1) implementations are usually more or less
stateless, while AccumulatorV2 implementations are almost always
stateful, leading to (unnecessary?) type checks (unlike
AccumulableParam). For example typical "merge" method of AccumulatorV2
requires to check whether current accumulator is of an appropriate
type, like here [3]

5. AccumulatorV2 is more difficult to implement correctly unlike
AccumulableParam. For example, in case of AccumulableParam I have to
implement just 3 methods (addAccumulator, addInPlace, zero), in case
of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
AccumulatorV2 - 6 methods (isZero, copy, reset, add, merge, value)

6. AccumulatorV2 classes are hardly possible to be anonymous classes,
because of their "copy" and "merge" methods which typically require a
concrete class to make a type check.

I understand the motivation for AccumulatorV2 (SPARK-14654 [4]), but
just wondering whether there is a way to simplify the API of
AccumulatorV2 to meet the points described above and to be less error
prone?


[1] https://issues.apache.org/jira/browse/SPARK-23697
[2] https://issues.apache.org/jira/browse/SPARK-24154
[3] 
https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L348
[4] https://issues.apache.org/jira/browse/SPARK-14654

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataFrames :: Corrupted Data

2018-03-28 Thread Sergey Zhemzhitsky
I suppose that it's hardly possible that this issue is connected with
the string encoding, because

- "pr^?files.10056.10040" should be "profiles.10056.10040" and is
defined as constant in the source code
- 
"profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@"
should no occur in exception at all, because such a strings are not
created within the job
- the strings being corrupted are defined within the job and there are
no such input data
- when yarn restarts the job for the second time after the first
failure, the job completes successfully




On Wed, Mar 28, 2018 at 10:31 PM, Jörn Franke <jornfra...@gmail.com> wrote:
> Encoding issue of the data? Eg spark uses utf-8 , but source encoding is 
> different?
>
>> On 28. Mar 2018, at 20:25, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote:
>>
>> Hello guys,
>>
>> I'm using Spark 2.2.0 and from time to time my job fails printing into
>> the log the following errors
>>
>> scala.MatchError:
>> profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>>
>> The job itself looks like the following and contains a few shuffles and UDAFs
>>
>> val df = spark.read.avro(...).as[...]
>>  .groupBy(...)
>>  .agg(collect_list(...).as(...))
>>  .select(explode(...).as(...))
>>  .groupBy(...)
>>  .agg(sum(...).as(...))
>>  .groupBy(...)
>>  .agg(collectMetrics(...).as(...))
>>
>> The errors occur in the collectMetrics UDAF in the following snippet
>>
>> key match {
>>  case "profiles.total" => updateMetrics(...)
>>  case "profiles.biz" => updateMetrics(...)
>>  case ProfileAttrsRegex(...) => updateMetrics(...)
>> }
>>
>> ... and I'm absolutely ok with scala.MatchError because there is no
>> "catch all" case in the pattern matching expression, but the strings
>> containing corrupted characters seem to be very strange.
>>
>> I've found the following jira issues, but it's hardly difficult to say
>> whether they are related to my case:
>> - https://issues.apache.org/jira/browse/SPARK-22092
>> - https://issues.apache.org/jira/browse/SPARK-23512
>>
>> So I'm wondering, has anybody ever seen such kind of behaviour and
>> what could be the problem?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DataFrames :: Corrupted Data

2018-03-28 Thread Sergey Zhemzhitsky
Hello guys,

I'm using Spark 2.2.0 and from time to time my job fails printing into
the log the following errors

scala.MatchError:
profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)

The job itself looks like the following and contains a few shuffles and UDAFs

val df = spark.read.avro(...).as[...]
  .groupBy(...)
  .agg(collect_list(...).as(...))
  .select(explode(...).as(...))
  .groupBy(...)
  .agg(sum(...).as(...))
  .groupBy(...)
  .agg(collectMetrics(...).as(...))

The errors occur in the collectMetrics UDAF in the following snippet

key match {
  case "profiles.total" => updateMetrics(...)
  case "profiles.biz" => updateMetrics(...)
  case ProfileAttrsRegex(...) => updateMetrics(...)
}

... and I'm absolutely ok with scala.MatchError because there is no
"catch all" case in the pattern matching expression, but the strings
containing corrupted characters seem to be very strange.

I've found the following jira issues, but it's hardly difficult to say
whether they are related to my case:
- https://issues.apache.org/jira/browse/SPARK-22092
- https://issues.apache.org/jira/browse/SPARK-23512

So I'm wondering, has anybody ever seen such kind of behaviour and
what could be the problem?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best way of shipping self-contained pyspark jobs with 3rd-party dependencies

2017-12-08 Thread Sergey Zhemzhitsky
Hi PySparkers,

What currently is the best way of shipping self-contained pyspark jobs
with 3rd-party dependencies?
There are some open JIRA issues [1], [2] as well as corresponding PRs
[3], [4] and articles [5], [6], [7] regarding setting up the python
environment with conda and virtualenv respectively, and I believe [7]
is misleading article, because of unsupported spark options, like
spark.pyspark.virtualenv.enabled,
spark.pyspark.virtualenv.requirements, etc.

So I'm wondering what the community does in cases, when it's necessary to
- prevent python package/module version conflicts between different jobs
- prevent updating all the nodes of the cluster in case of new job dependencies
- track which dependencies are introduced on the per-job basis


[1] https://issues.apache.org/jira/browse/SPARK-13587
[2] https://issues.apache.org/jira/browse/SPARK-16367
[3] https://github.com/apache/spark/pull/13599
[4] https://github.com/apache/spark/pull/14180
[5] https://www.anaconda.com/blog/developer-blog/conda-spark
[6] http://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv
[7] 
https://community.hortonworks.com/articles/104947/using-virtualenv-with-pyspark.html

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best way of shipping self-contained pyspark jobs with 3rd-party dependencies

2017-12-07 Thread Sergey Zhemzhitsky
Hi PySparkers,

What currently is the best way of shipping self-contained pyspark jobs with
3rd-party dependencies?
There are some open JIRA issues [1], [2] as well as corresponding PRs [3],
[4] and articles [5], [6], regarding setting up the python environment with
conda and virtualenv respectively.

So I'm wondering what the community does in cases, when it's necessary to
- prevent python package/module version conflicts between different jobs
- prevent updating all the nodes of the cluster in case of new job
dependencies
- track which dependencies are introduced on the per-job basis


[1] https://issues.apache.org/jira/browse/SPARK-13587
[2] https://issues.apache.org/jira/browse/SPARK-16367
[3] https://github.com/apache/spark/pull/13599
[4] https://github.com/apache/spark/pull/14180
[5] https://www.anaconda.com/blog/developer-blog/conda-spark/
[6]
http://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv/


What is the purpose of having RDD.context and RDD.sparkContext at the same time?

2017-06-27 Thread Sergey Zhemzhitsky
Hello spark gurus,

Could you please shed some light on what is the purpose of having two
identical functions in RDD,
RDD.context [1] and RDD.sparkContext [2].

RDD.context seems to be used more frequently across the source code.

[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1693
[2]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L146

Kind Regards,
Sergey


Re: Is GraphX really deprecated?

2017-05-16 Thread Sergey Zhemzhitsky
GraphFrames seems promising but it still has a lot of algorithms, which involve
in one way or another GraphX, or run on top of GraphX according to github
repo (
https://github.com/graphframes/graphframes/tree/master/src/main/scala/org/graphframes/lib),
and in case of RDDs and semistructured data it's not really necessary to
include another library that just will delegate to GraphX, which is still
shipped with Spark as the default graph-processing module.

Also doesn't Pregel-like programming abstraction of GraphX (although it is
on top of RDD joins) seem to be more natural than a number of join steps of
GraphFrames? I believe such an abstraction wouldn't hurt GraphFrames too.



On May 14, 2017 19:07, "Jules Damji" <dmat...@comast.net> wrote:

GraphFrames is not part of Spark Core as is Structured Streaming; it's
still open-source and part of Spark packages. But I anticipate that as it
becomes more at parity with all GraphX in algorithms & functionality, it's
not unreasonable to anticipate its inevitable wide adoption and preference.

To get a flavor have a go at it https://databricks.com/blog
/2016/03/03/introducing-graphframes.html

Cheers
Jules

Sent from my iPhone
Pardon the dumb thumb typos :)

On May 13, 2017, at 2:01 PM, Jacek Laskowski <ja...@japila.pl> wrote:

Hi,

I'd like to hear the official statement too.

My take on GraphX and Spark Streaming is that they are long dead projects
with GraphFrames and Structured Streaming taking their place, respectively.

Jacek

On 13 May 2017 3:00 p.m., "Sergey Zhemzhitsky" <szh.s...@gmail.com> wrote:

> Hello Spark users,
>
> I just would like to know whether the GraphX component should be
> considered deprecated and no longer actively maintained
> and should not be considered when starting new graph-processing projects
> on top of Spark in favour of other
> graph-processing frameworks?
>
> I'm asking because
>
> 1. According to some discussions in GitHub pull requests, there are
> thoughts that GraphX is not under active development and
> can probably be deprecated soon.
>
> https://github.com/apache/spark/pull/15125
>
> 2. According to Jira activities GraphX component seems to be not very
> active and quite a lot of improvements are
> resolved as "Won't fix" event with pull requests provided.
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20S
> PARK%20AND%20component%20%3D%20GraphX%20AND%20resolution%
> 20in%20(%22Unresolved%22%2C%20%22Won%27t%20Fix%22%2C%20%22Wo
> n%27t%20Do%22%2C%20Later%2C%20%22Not%20A%20Bug%22%2C%20%
> 22Not%20A%20Problem%22)%20ORDER%20BY%20created%20DESC
>
> So, I'm wondering what the community who uses GraphX, and commiters who
> develop it think regarding this Spark component?
>
> Kind regards,
> Sergey
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Is GraphX really deprecated?

2017-05-13 Thread Sergey Zhemzhitsky
Hello Spark users,

I just would like to know whether the GraphX component should be considered 
deprecated and no longer actively maintained
and should not be considered when starting new graph-processing projects on top 
of Spark in favour of other
graph-processing frameworks?

I'm asking because

1. According to some discussions in GitHub pull requests, there are thoughts 
that GraphX is not under active development and
can probably be deprecated soon.

https://github.com/apache/spark/pull/15125

2. According to Jira activities GraphX component seems to be not very active 
and quite a lot of improvements are
resolved as "Won't fix" event with pull requests provided.

https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20component%20%3D%20GraphX%20AND%20resolution%20in%20(%22Unresolved%22%2C%20%22Won%27t%20Fix%22%2C%20%22Won%27t%20Do%22%2C%20Later%2C%20%22Not%20A%20Bug%22%2C%20%22Not%20A%20Problem%22)%20ORDER%20BY%20created%20DESC

So, I'm wondering what the community who uses GraphX, and commiters who develop 
it think regarding this Spark component?

Kind regards,
Sergey



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org