Re: Dynamic metric names
Hi Saisai, Thanks a lot for the link! This is exactly what I need. Just curious, why this PR has not been merged, as it seems to implement rather natural requirement. There are a number or use cases which can benefit from this feature, e.g. - collecting business metrics based on the data's attributes and reporting them into the monitoring system as a side effect of the data processing - visualizing technical metrics by means of alternative software (e.g. grafana) - currently it's hardly possible to know the actual number of jobs, stages, tasks and their names and IDs in advance to register all the corresponding metrics statically. Kind Regards, Sergey On Mon, May 6, 2019, 16:07 Saisai Shao wrote: > I remembered there was a PR about doing similar thing ( > https://github.com/apache/spark/pull/18406). From my understanding, this > seems like a quite specific requirement, it may requires code change to > support your needs. > > Thanks > Saisai > > Sergey Zhemzhitsky 于2019年5月4日周六 下午4:44写道: > >> Hello Spark Users! >> >> Just wondering whether it is possible to register a metric source without >> metrics known in advance and add the metrics themselves to this source >> later on? >> >> It seems that currently MetricSystem puts all the metrics from the >> source's MetricRegistry into a shared MetricRegistry of a MetricSystem >> during metric source registration [1]. >> >> So in case there is a new metric with a new name added to the source's >> registry after this source registration, then this new metric will not be >> reported to the sinks. >> >> What I'd like to achieve is to be able to register new metrics with new >> names dynamically using a single metric source. >> Is it somehow possible? >> >> >> [1] >> https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162 >> >
Dynamic metric names
Hello Spark Users! Just wondering whether it is possible to register a metric source without metrics known in advance and add the metrics themselves to this source later on? It seems that currently MetricSystem puts all the metrics from the source's MetricRegistry into a shared MetricRegistry of a MetricSystem during metric source registration [1]. So in case there is a new metric with a new name added to the source's registry after this source registration, then this new metric will not be reported to the sinks. What I'd like to achieve is to be able to register new metrics with new names dynamically using a single metric source. Is it somehow possible? [1] https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162
Re: Accumulator guarantees
As far as I understand updates of the custom accumulators at the driver side happen during task completion [1]. The documentation states [2] that the very last stage in a job consists of multiple ResultTasks, which execute the task and send its output back to the driver application. Also sources prove [3] that accumulators are updated for the ResultTasks just once. So it's seems that accumulators are safe to use and will be executed only once regardless of where they are used (transformations as well as actions) in case these transformations and actions are belong to the last stage. Is it correct? Could anyone of Spark commiters or contributors please confirm it? [1] https://github.com/apache/spark/blob/3990daaf3b6ca2c5a9f7790030096262efb12cb2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1194 [2] https://github.com/apache/spark/blob/a6fc300e91273230e7134ac6db95ccb4436c6f8f/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L36 [3] https://github.com/apache/spark/blob/3990daaf3b6ca2c5a9f7790030096262efb12cb2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1204 On Thu, May 10, 2018 at 10:24 PM, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote: > Hi there, > > Although Spark's docs state that there is a guarantee that > - accumulators in actions will only be updated once > - accumulators in transformations may be updated multiple times > > ... I'm wondering whether the same is true for transformations in the > last stage of the job or there is a guarantee that accumulators > running in transformations of the last stage are guaranteed to be > updated once too? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Accumulator guarantees
Hi there, Although Spark's docs state that there is a guarantee that - accumulators in actions will only be updated once - accumulators in transformations may be updated multiple times ... I'm wondering whether the same is true for transformations in the last stage of the job or there is a guarantee that accumulators running in transformations of the last stage are guaranteed to be updated once too? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: AccumulatorV2 vs AccumulableParam (V1)
Hi Wenchen, Thanks a lot for clarification and help. Here is what I mean regarding the remaining points For 2: Should we update the documentation [1] regarding custom accumulators to be more clear and to highlight that a) custom accumulators should always override "copy" method to prevent unexpected behaviour with losing type information b) custom accumulators cannot be direct anonymous subclasses of AccumulatorV2 because of a) c) extending already existing accumulators almost always requires overriding "copy" because of a) For 3: Here is [2] the sample that shows that the same AccumulableParam can be registered twice with different names. Here is [3] the sample that fails with IllegalStateException on this line [4] because accumulator's metadata is not null and it's hardly possible to reset it to null (there is no public API for such a thing). I understand, that Spark creates different Accumulators for the same AccumulableParam internally and because of AccumulatorV2 is stateful using the same stateful accumulator instance in multiple places for different things is very dangerous, so maybe we should highlight this point in the documentation too? For 5: Should we raise a JIRA for that? [1] https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators [2] https://gist.github.com/szhem/52a26ada4bbeb1a3e762710adc3f94ef#file-accumulatorsspec-scala-L36 [3] https://gist.github.com/szhem/52a26ada4bbeb1a3e762710adc3f94ef#file-accumulatorsspec-scala-L59 [4] https://github.com/apache/spark/blob/4d5de4d303a773b1c18c350072344bd7efca9fc4/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L51 Kind Regards, Sergey On Thu, May 3, 2018 at 5:20 PM, Wenchen Fan <cloud0...@gmail.com> wrote: > Hi Sergey, > > Thanks for your valuable feedback! > > For 1: yea this is definitely a bug and I have sent a PR to fix it. > For 2: I have left my comments on the JIRA ticket. > For 3: I don't quite understand it, can you give some concrete examples? > For 4: yea this is a problem, but I think it's not a big deal, and we > couldn't find a better solution at that time. > For 5: I think this is a real problem. It looks to me that we can merge > `isZero`, `copyAndReset`, `copy`, `reset` into one API: `zero`, which is > basically just the `copyAndReset`. If there is a way to fix this without > breaking the existing API, I'm really happy to do it. > For 6: same as 4. It's a problem but not a big deal. > > In general, I think accumulator v2 sacrifices some flexibility to simplify > the framework and improve the performance. Users can still use accumulator > v1 if flexibility is more important to them. We can keep improving > accumulator v2 without breaking backward compatibility. > > Thanks, > Wenchen > > On Thu, May 3, 2018 at 6:20 AM, Sergey Zhemzhitsky <szh.s...@gmail.com> > wrote: >> >> Hello guys, >> >> I've started to migrate my Spark jobs which use Accumulators V1 to >> AccumulatorV2 and faced with the following issues: >> >> 1. LegacyAccumulatorWrapper now requires the resulting type of >> AccumulableParam to implement equals. In other case the >> AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper, >> will fail with AssertionError (SPARK-23697 [1]). >> >> 2. Existing AccumulatorV2 classes are hardly difficult to extend >> easily and correctly (SPARK-24154 [2]) due to its "copy" method which >> is called during serialization and usually loses type information of >> descendant classes which don't override "copy" (and it's easier to >> implement an accumulator from scratch than override it correctly) >> >> 3. The same instance of AccumulatorV2 cannot be used with the same >> SparkContext multiple times (unlike AccumulableParam) failing with >> "IllegalStateException: Cannot register an Accumulator twice" even >> after "reset" method called. So it's impossible to unregister already >> registered accumulator from user code. >> >> 4. AccumulableParam (V1) implementations are usually more or less >> stateless, while AccumulatorV2 implementations are almost always >> stateful, leading to (unnecessary?) type checks (unlike >> AccumulableParam). For example typical "merge" method of AccumulatorV2 >> requires to check whether current accumulator is of an appropriate >> type, like here [3] >> >> 5. AccumulatorV2 is more difficult to implement correctly unlike >> AccumulableParam. For example, in case of AccumulableParam I have to >> implement just 3 methods (addAccumulator, addInPlace, zero), in case >> of AccumulableParam - just 2 methods (addInPlace, zero) and in case of >> AccumulatorV2 - 6 methods (isZero, copy, res
AccumulatorV2 vs AccumulableParam (V1)
Hello guys, I've started to migrate my Spark jobs which use Accumulators V1 to AccumulatorV2 and faced with the following issues: 1. LegacyAccumulatorWrapper now requires the resulting type of AccumulableParam to implement equals. In other case the AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper, will fail with AssertionError (SPARK-23697 [1]). 2. Existing AccumulatorV2 classes are hardly difficult to extend easily and correctly (SPARK-24154 [2]) due to its "copy" method which is called during serialization and usually loses type information of descendant classes which don't override "copy" (and it's easier to implement an accumulator from scratch than override it correctly) 3. The same instance of AccumulatorV2 cannot be used with the same SparkContext multiple times (unlike AccumulableParam) failing with "IllegalStateException: Cannot register an Accumulator twice" even after "reset" method called. So it's impossible to unregister already registered accumulator from user code. 4. AccumulableParam (V1) implementations are usually more or less stateless, while AccumulatorV2 implementations are almost always stateful, leading to (unnecessary?) type checks (unlike AccumulableParam). For example typical "merge" method of AccumulatorV2 requires to check whether current accumulator is of an appropriate type, like here [3] 5. AccumulatorV2 is more difficult to implement correctly unlike AccumulableParam. For example, in case of AccumulableParam I have to implement just 3 methods (addAccumulator, addInPlace, zero), in case of AccumulableParam - just 2 methods (addInPlace, zero) and in case of AccumulatorV2 - 6 methods (isZero, copy, reset, add, merge, value) 6. AccumulatorV2 classes are hardly possible to be anonymous classes, because of their "copy" and "merge" methods which typically require a concrete class to make a type check. I understand the motivation for AccumulatorV2 (SPARK-14654 [4]), but just wondering whether there is a way to simplify the API of AccumulatorV2 to meet the points described above and to be less error prone? [1] https://issues.apache.org/jira/browse/SPARK-23697 [2] https://issues.apache.org/jira/browse/SPARK-24154 [3] https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L348 [4] https://issues.apache.org/jira/browse/SPARK-14654 - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: DataFrames :: Corrupted Data
I suppose that it's hardly possible that this issue is connected with the string encoding, because - "pr^?files.10056.10040" should be "profiles.10056.10040" and is defined as constant in the source code - "profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@" should no occur in exception at all, because such a strings are not created within the job - the strings being corrupted are defined within the job and there are no such input data - when yarn restarts the job for the second time after the first failure, the job completes successfully On Wed, Mar 28, 2018 at 10:31 PM, Jörn Franke <jornfra...@gmail.com> wrote: > Encoding issue of the data? Eg spark uses utf-8 , but source encoding is > different? > >> On 28. Mar 2018, at 20:25, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote: >> >> Hello guys, >> >> I'm using Spark 2.2.0 and from time to time my job fails printing into >> the log the following errors >> >> scala.MatchError: >> profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@ >> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String) >> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String) >> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String) >> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String) >> >> The job itself looks like the following and contains a few shuffles and UDAFs >> >> val df = spark.read.avro(...).as[...] >> .groupBy(...) >> .agg(collect_list(...).as(...)) >> .select(explode(...).as(...)) >> .groupBy(...) >> .agg(sum(...).as(...)) >> .groupBy(...) >> .agg(collectMetrics(...).as(...)) >> >> The errors occur in the collectMetrics UDAF in the following snippet >> >> key match { >> case "profiles.total" => updateMetrics(...) >> case "profiles.biz" => updateMetrics(...) >> case ProfileAttrsRegex(...) => updateMetrics(...) >> } >> >> ... and I'm absolutely ok with scala.MatchError because there is no >> "catch all" case in the pattern matching expression, but the strings >> containing corrupted characters seem to be very strange. >> >> I've found the following jira issues, but it's hardly difficult to say >> whether they are related to my case: >> - https://issues.apache.org/jira/browse/SPARK-22092 >> - https://issues.apache.org/jira/browse/SPARK-23512 >> >> So I'm wondering, has anybody ever seen such kind of behaviour and >> what could be the problem? >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
DataFrames :: Corrupted Data
Hello guys, I'm using Spark 2.2.0 and from time to time my job fails printing into the log the following errors scala.MatchError: profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@ scala.MatchError: pr^?files.10056.10040 (of class java.lang.String) scala.MatchError: pr^?files.10056.10040 (of class java.lang.String) scala.MatchError: pr^?files.10056.10040 (of class java.lang.String) scala.MatchError: pr^?files.10056.10040 (of class java.lang.String) The job itself looks like the following and contains a few shuffles and UDAFs val df = spark.read.avro(...).as[...] .groupBy(...) .agg(collect_list(...).as(...)) .select(explode(...).as(...)) .groupBy(...) .agg(sum(...).as(...)) .groupBy(...) .agg(collectMetrics(...).as(...)) The errors occur in the collectMetrics UDAF in the following snippet key match { case "profiles.total" => updateMetrics(...) case "profiles.biz" => updateMetrics(...) case ProfileAttrsRegex(...) => updateMetrics(...) } ... and I'm absolutely ok with scala.MatchError because there is no "catch all" case in the pattern matching expression, but the strings containing corrupted characters seem to be very strange. I've found the following jira issues, but it's hardly difficult to say whether they are related to my case: - https://issues.apache.org/jira/browse/SPARK-22092 - https://issues.apache.org/jira/browse/SPARK-23512 So I'm wondering, has anybody ever seen such kind of behaviour and what could be the problem? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Best way of shipping self-contained pyspark jobs with 3rd-party dependencies
Hi PySparkers, What currently is the best way of shipping self-contained pyspark jobs with 3rd-party dependencies? There are some open JIRA issues [1], [2] as well as corresponding PRs [3], [4] and articles [5], [6], [7] regarding setting up the python environment with conda and virtualenv respectively, and I believe [7] is misleading article, because of unsupported spark options, like spark.pyspark.virtualenv.enabled, spark.pyspark.virtualenv.requirements, etc. So I'm wondering what the community does in cases, when it's necessary to - prevent python package/module version conflicts between different jobs - prevent updating all the nodes of the cluster in case of new job dependencies - track which dependencies are introduced on the per-job basis [1] https://issues.apache.org/jira/browse/SPARK-13587 [2] https://issues.apache.org/jira/browse/SPARK-16367 [3] https://github.com/apache/spark/pull/13599 [4] https://github.com/apache/spark/pull/14180 [5] https://www.anaconda.com/blog/developer-blog/conda-spark [6] http://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv [7] https://community.hortonworks.com/articles/104947/using-virtualenv-with-pyspark.html - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Best way of shipping self-contained pyspark jobs with 3rd-party dependencies
Hi PySparkers, What currently is the best way of shipping self-contained pyspark jobs with 3rd-party dependencies? There are some open JIRA issues [1], [2] as well as corresponding PRs [3], [4] and articles [5], [6], regarding setting up the python environment with conda and virtualenv respectively. So I'm wondering what the community does in cases, when it's necessary to - prevent python package/module version conflicts between different jobs - prevent updating all the nodes of the cluster in case of new job dependencies - track which dependencies are introduced on the per-job basis [1] https://issues.apache.org/jira/browse/SPARK-13587 [2] https://issues.apache.org/jira/browse/SPARK-16367 [3] https://github.com/apache/spark/pull/13599 [4] https://github.com/apache/spark/pull/14180 [5] https://www.anaconda.com/blog/developer-blog/conda-spark/ [6] http://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv/
What is the purpose of having RDD.context and RDD.sparkContext at the same time?
Hello spark gurus, Could you please shed some light on what is the purpose of having two identical functions in RDD, RDD.context [1] and RDD.sparkContext [2]. RDD.context seems to be used more frequently across the source code. [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1693 [2] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L146 Kind Regards, Sergey
Re: Is GraphX really deprecated?
GraphFrames seems promising but it still has a lot of algorithms, which involve in one way or another GraphX, or run on top of GraphX according to github repo ( https://github.com/graphframes/graphframes/tree/master/src/main/scala/org/graphframes/lib), and in case of RDDs and semistructured data it's not really necessary to include another library that just will delegate to GraphX, which is still shipped with Spark as the default graph-processing module. Also doesn't Pregel-like programming abstraction of GraphX (although it is on top of RDD joins) seem to be more natural than a number of join steps of GraphFrames? I believe such an abstraction wouldn't hurt GraphFrames too. On May 14, 2017 19:07, "Jules Damji" <dmat...@comast.net> wrote: GraphFrames is not part of Spark Core as is Structured Streaming; it's still open-source and part of Spark packages. But I anticipate that as it becomes more at parity with all GraphX in algorithms & functionality, it's not unreasonable to anticipate its inevitable wide adoption and preference. To get a flavor have a go at it https://databricks.com/blog /2016/03/03/introducing-graphframes.html Cheers Jules Sent from my iPhone Pardon the dumb thumb typos :) On May 13, 2017, at 2:01 PM, Jacek Laskowski <ja...@japila.pl> wrote: Hi, I'd like to hear the official statement too. My take on GraphX and Spark Streaming is that they are long dead projects with GraphFrames and Structured Streaming taking their place, respectively. Jacek On 13 May 2017 3:00 p.m., "Sergey Zhemzhitsky" <szh.s...@gmail.com> wrote: > Hello Spark users, > > I just would like to know whether the GraphX component should be > considered deprecated and no longer actively maintained > and should not be considered when starting new graph-processing projects > on top of Spark in favour of other > graph-processing frameworks? > > I'm asking because > > 1. According to some discussions in GitHub pull requests, there are > thoughts that GraphX is not under active development and > can probably be deprecated soon. > > https://github.com/apache/spark/pull/15125 > > 2. According to Jira activities GraphX component seems to be not very > active and quite a lot of improvements are > resolved as "Won't fix" event with pull requests provided. > > https://issues.apache.org/jira/issues/?jql=project%20%3D%20S > PARK%20AND%20component%20%3D%20GraphX%20AND%20resolution% > 20in%20(%22Unresolved%22%2C%20%22Won%27t%20Fix%22%2C%20%22Wo > n%27t%20Do%22%2C%20Later%2C%20%22Not%20A%20Bug%22%2C%20% > 22Not%20A%20Problem%22)%20ORDER%20BY%20created%20DESC > > So, I'm wondering what the community who uses GraphX, and commiters who > develop it think regarding this Spark component? > > Kind regards, > Sergey > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Is GraphX really deprecated?
Hello Spark users, I just would like to know whether the GraphX component should be considered deprecated and no longer actively maintained and should not be considered when starting new graph-processing projects on top of Spark in favour of other graph-processing frameworks? I'm asking because 1. According to some discussions in GitHub pull requests, there are thoughts that GraphX is not under active development and can probably be deprecated soon. https://github.com/apache/spark/pull/15125 2. According to Jira activities GraphX component seems to be not very active and quite a lot of improvements are resolved as "Won't fix" event with pull requests provided. https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20component%20%3D%20GraphX%20AND%20resolution%20in%20(%22Unresolved%22%2C%20%22Won%27t%20Fix%22%2C%20%22Won%27t%20Do%22%2C%20Later%2C%20%22Not%20A%20Bug%22%2C%20%22Not%20A%20Problem%22)%20ORDER%20BY%20created%20DESC So, I'm wondering what the community who uses GraphX, and commiters who develop it think regarding this Spark component? Kind regards, Sergey - To unsubscribe e-mail: user-unsubscr...@spark.apache.org