Metrics Problem

2020-06-25 Thread Bryan Jeffrey
Hello.

I am running Spark 2.4.4. I have implemented a custom metrics producer. It
works well when I run locally, or specify the metrics producer only for the
driver.  When I ask for executor metrics I run into ClassNotFoundExceptions

*Is it possible to pass a metrics JAR via --jars?  If so what am I missing?*

Deploy driver stats via:
--jars hdfs:///custommetricsprovider.jar
--conf
spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink

However, when I pass the JAR with the metrics provider to executors via:
--jars hdfs:///custommetricsprovider.jar
--conf
spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink

I get ClassNotFoundException:

20/06/25 21:19:35 ERROR MetricsSystem: Sink class
org.apache.spark.custommetricssink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.custommetricssink
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at
org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 4 more

Is it possible to pass a metrics JAR via --jars?  If so what am I missing?

Thank you,

Bryan


Blog : Apache Spark Window Functions

2020-06-25 Thread neeraj bhadani
Hi Team,
 I would like to share with the community that my blog on "Apache Spark
Window Functions" got published. PFB link if anyone interested.

Link:
https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-window-functions-7b4e39ad3c86

Please share your thoughts and feedback.

Regards,
Neeraj


Re: Getting PySpark Partitions Locations

2020-06-25 Thread Sean Owen
You can always list the S3 output path, of course.

On Thu, Jun 25, 2020 at 7:52 AM Tzahi File  wrote:

> Hi,
>
> I'm using pyspark to write df to s3, using the following command:
> "df.write.partitionBy("day","hour","country").mode("overwrite").parquet(s3_output)".
>
> Is there any way to get the partitions created?
> e.g.
> day=2020-06-20/hour=1/country=US
> day=2020-06-20/hour=2/country=US
> ..
>
> --
> Tzahi File
> Data Engineer
> [image: ironSource] 
>
> email tzahi.f...@ironsrc.com
> mobile +972-546864835
> fax +972-77-5448273
> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
> ironsrc.com 
> [image: linkedin] [image:
> twitter] [image: facebook]
> [image: googleplus]
> 
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>


Re: Getting PySpark Partitions Locations

2020-06-25 Thread Sanjeev Mishra
You can use catalog apis see following

https://stackoverflow.com/questions/54268845/how-to-check-the-number-of-partitions-of-a-spark-dataframe-without-incurring-the/54270537

On Thu, Jun 25, 2020 at 6:19 AM Tzahi File  wrote:

> I don't want to query with a distinct on the partitioned columns, the df
> contains over 1 Billion of records.
> I just want to know the partitions that were created..
>
> On Thu, Jun 25, 2020 at 4:04 PM Jörn Franke  wrote:
>
>> By doing a select on the df ?
>>
>> Am 25.06.2020 um 14:52 schrieb Tzahi File :
>>
>> 
>> Hi,
>>
>> I'm using pyspark to write df to s3, using the following command:
>> "df.write.partitionBy("day","hour","country").mode("overwrite").parquet(s3_output)".
>>
>> Is there any way to get the partitions created?
>> e.g.
>> day=2020-06-20/hour=1/country=US
>> day=2020-06-20/hour=2/country=US
>> ..
>>
>> --
>> Tzahi File
>> Data Engineer
>> [image: ironSource] 
>>
>> email tzahi.f...@ironsrc.com
>> mobile +972-546864835
>> fax +972-77-5448273
>> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
>> ironsrc.com 
>> [image: linkedin] [image:
>> twitter] [image: facebook]
>> [image: googleplus]
>> 
>> This email (including any attachments) is for the sole use of the
>> intended recipient and may contain confidential information which may be
>> protected by legal privilege. If you are not the intended recipient, or the
>> employee or agent responsible for delivering it to the intended recipient,
>> you are hereby notified that any use, dissemination, distribution or
>> copying of this communication and/or its content is strictly prohibited. If
>> you are not the intended recipient, please immediately notify us by reply
>> email or by telephone, delete this email and destroy any copies. Thank you.
>>
>>
>
> --
> Tzahi File
> Data Engineer
> [image: ironSource] 
>
> email tzahi.f...@ironsrc.com
> mobile +972-546864835
> fax +972-77-5448273
> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
> ironsrc.com 
> [image: linkedin] [image:
> twitter] [image: facebook]
> [image: googleplus]
> 
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>


Re: Getting PySpark Partitions Locations

2020-06-25 Thread Tzahi File
I don't want to query with a distinct on the partitioned columns, the df
contains over 1 Billion of records.
I just want to know the partitions that were created..

On Thu, Jun 25, 2020 at 4:04 PM Jörn Franke  wrote:

> By doing a select on the df ?
>
> Am 25.06.2020 um 14:52 schrieb Tzahi File :
>
> 
> Hi,
>
> I'm using pyspark to write df to s3, using the following command:
> "df.write.partitionBy("day","hour","country").mode("overwrite").parquet(s3_output)".
>
> Is there any way to get the partitions created?
> e.g.
> day=2020-06-20/hour=1/country=US
> day=2020-06-20/hour=2/country=US
> ..
>
> --
> Tzahi File
> Data Engineer
> [image: ironSource] 
>
> email tzahi.f...@ironsrc.com
> mobile +972-546864835
> fax +972-77-5448273
> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
> ironsrc.com 
> [image: linkedin] [image:
> twitter] [image: facebook]
> [image: googleplus]
> 
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>
>

-- 
Tzahi File
Data Engineer
[image: ironSource] 

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com 
[image: linkedin] [image:
twitter] [image: facebook]
[image: googleplus]

This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Re: Getting PySpark Partitions Locations

2020-06-25 Thread Jörn Franke
By doing a select on the df ?

> Am 25.06.2020 um 14:52 schrieb Tzahi File :
> 
> 
> Hi,
> 
> I'm using pyspark to write df to s3, using the following command: 
> "df.write.partitionBy("day","hour","country").mode("overwrite").parquet(s3_output)".
> 
> Is there any way to get the partitions created?
> e.g. 
> day=2020-06-20/hour=1/country=US
> day=2020-06-20/hour=2/country=US
> ..
> 
> -- 
> Tzahi File
> Data Engineer
> 
> email tzahi.f...@ironsrc.com
> mobile +972-546864835
> fax +972-77-5448273
> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
> ironsrc.com
> 
> This email (including any attachments) is for the sole use of the intended 
> recipient and may contain confidential information which may be protected by 
> legal privilege. If you are not the intended recipient, or the employee or 
> agent responsible for delivering it to the intended recipient, you are hereby 
> notified that any use, dissemination, distribution or copying of this 
> communication and/or its content is strictly prohibited. If you are not the 
> intended recipient, please immediately notify us by reply email or by 
> telephone, delete this email and destroy any copies. Thank you.


Getting PySpark Partitions Locations

2020-06-25 Thread Tzahi File
Hi,

I'm using pyspark to write df to s3, using the following command:
"df.write.partitionBy("day","hour","country").mode("overwrite").parquet(s3_output)".

Is there any way to get the partitions created?
e.g.
day=2020-06-20/hour=1/country=US
day=2020-06-20/hour=2/country=US
..

-- 
Tzahi File
Data Engineer
[image: ironSource] 

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com 
[image: linkedin] [image:
twitter] [image: facebook]
[image: googleplus]

This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Re: Where are all the jars gone ?

2020-06-25 Thread Anwar AliKhan
I know I can  arrive at the same result with this code,

  val range100 = spark.range(1,101).agg((sum('id) as
"sum")).first.get(0)
  println(f"sum of range100 =  $range100")

so I am not stuck,
I was just curious   why the code breaks using the current link
libraries.

spark.range(1,101).reduce(_+_)

spark-submit test

/opt/spark/spark-submit

spark.range(1,101).reduce(_+_)
:24: error: overloaded method value reduce with alternatives:
  (func:
org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long

  (func: (java.lang.Long, java.lang.Long) => java.lang.Long)java.lang.Long
 cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long)
   spark.range(1,101).reduce(_+_)



On Wed, 24 Jun 2020, 19:54 Anwar AliKhan,  wrote:

>
> I am using the method describe on this page for Scala development in
> eclipse.
>
> https://data-flair.training/blogs/create-spark-scala-project/
>
>
> in the middle of the page you will find
>
>
> *“y**ou will see lots of error due to missing libraries.*
> viii. Add Spark Libraries”
>
>
> Now that I have my own build I will be pointing to the jars (spark
> libraries)
>
> in directory /opt/spark/assembly/target/scala-2.12/jars
>
>
> This way I know exactly the jar libraries I am using to remove the
> formentioned errors.
>
>
> At the same time I am trying to setup a template environment as shown here
>
>
> https://medium.com/@faizanahemad/apache-spark-setup-with-gradle-scala-and-intellij-2eeb9f30c02a
>
>
> so that I can have variables sc and spark in the eclipse editor same you
> would have spark, sc variables in the spark-shell.
>
>
> I used the word trying because the following code is broken
>
>
> spark.range(1,101).reduce(_ + _)
>
> with latest spark.
>
>
> If I use the gradle method as described then the code does work because
> it is pulling the libraries from maven repository as stipulated in
> gradle.properties
> 
> .
>
>
> In my previous post I *forget* with maven pom.xml you can actually
> specify version number of jar you want to pull from maven repository using 
> *mvn
> clean package *command.
>
>
> So even if I use maven with eclipse then any new libraries uploaded in
> maven repository by developers will have recent version numbers. So will
> not effect my project.
>
> Can you please tell me why the code spark.range(1,101).reduce(_ + _) is
> broken with latest spark ?
>
>
> 
>
>
> On Wed, 24 Jun 2020, 17:07 Jeff Evans, 
> wrote:
>
>> If I'm understanding this correctly, you are building Spark from source
>> and using the built artifacts (jars) in some other project.  Correct?  If
>> so, then why are you concerning yourself with the directory structure that
>> Spark, internally, uses when building its artifacts?  It should be a black
>> box to your application, entirely.  You would pick the profiles (ex: Scala
>> version, Hadoop version, etc.) you need, then the install phase of Maven
>> will take care of building the jars and putting them in your local Maven
>> repo.  After that, you can resolve them from your other project seamlessly
>> (simply by declaring the org/artifact/version).
>>
>> Maven artifacts are immutable, at least released versions in Maven
>> central.  If "someone" (unclear who you are talking about) is "swapping
>> out" jars in a Maven repo then they're doing something extremely strange
>> and broken, unless they're simply replacing snapshot versions, which is a 
>> different
>> beast entirely
>> 
>> .
>>
>> On Wed, Jun 24, 2020 at 10:39 AM Anwar AliKhan 
>> wrote:
>>
>>> THANKS
>>>
>>>
>>> It appears the directory containing the jars have been switched from
>>> download version to source version.
>>>
>>> In the download version it is just below parent directory called jars.
>>> level 1.
>>>
>>> In the git source version it is  4 levels down in the directory
>>>  /spark/assembly/target/scala-2.12/jars
>>>
>>> The issue I have with using maven is that the linking libraries can be
>>> changed at maven repository without my knowledge .
>>> So if an application compiled and worked previously could just break.
>>>
>>> It is not like when the developers make a change to the link libraries
>>> they run it by me first ,  they just upload it to maven repository with
>>> out asking me if their change
>>> Is going to impact my app.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 24 Jun 2020, 16:07 ArtemisDev,  wrote:
>>>
 If you are using Maven to manage your jar dependencies, the jar files
 are located in the maven repository on your home directory.  It is usually
 in the .m2 directory.

 Hope this helps.

 -ND
 On 6/23/20 3:21 PM, Anwar AliKhan wrote:

 Hi,

 I prefer to do most of my projects in Python and for that I use Jupyter.
 I have been 

Suggested Amendment to ./dev/make-distribution.sh

2020-06-25 Thread Anwar AliKhan
 May I suggest amending your ./dev/make-distribution.sh. 蘿
To include a   check if these two previously mentioned packages  are
installed and if not 樂 install them
as part of build process . The build process time will increase if the
packages are not installed.  Long build process is normal  expectation
especially if a project has been going for 10 years.

A message to say these packages are needed but not installed . Please wait
while packages are being installed will be helpful to the user experience.珞






On Wed, 24 Jun 2020, 16:21 Anwar AliKhan,  wrote:

> THANKS !
>
>
> It appears that was the last dependency for the build.
> sudo apt-get install -y r-cran-e1071.
>
> Shout out to  ZOOM
> https://zoomadmin.com/HowToInstall/UbuntuPackage/r-cran-e1071  again
> like they say it was "It’s Super Easy! "
>
> package  knitr was the previous missing dependency which I was able to
> work out from build error message
> sudo apt install knitr
>
> 'e1071' doesn't appear to be a package name or namespace.
> package 'e1071' seems to be a formidable package for machine learning
> algorithms.
>
>
> *** installing help indices
> ** building package indices
> ** installing vignettes
> ** testing if installed package can be loaded from temporary location
> ** testing if installed package can be loaded from final location
> ** testing if installed package keeps a record of temporary installation
> path
> * DONE (SparkR)
> /opt/spark/R
> + popd
> + mkdir /opt/spark/dist/conf
> + cp /opt/spark/conf/fairscheduler.xml.template
> /opt/spark/conf/log4j.properties.template
> /opt/spark/conf/metrics.properties.template /opt/spark/conf/slaves.template
> /opt/spark/conf/spark-defaults.conf.template
> /opt/spark/conf/spark-env.sh.template /opt/spark/dist/conf
> + cp /opt/spark/README.md /opt/spark/dist
> + cp -r /opt/spark/bin /opt/spark/dist
> + cp -r /opt/spark/python /opt/spark/dist
> + '[' true == true ']'
> + rm -f /opt/spark/dist/python/dist/pyspark-3.1.0.dev0.tar.gz
> + cp -r /opt/spark/sbin /opt/spark/dist
> + '[' -d /opt/spark/R/lib/SparkR ']'
> + mkdir -p /opt/spark/dist/R/lib
> + cp -r /opt/spark/R/lib/SparkR /opt/spark/dist/R/lib
> + cp /opt/spark/R/lib/sparkr.zip /opt/spark/dist/R/lib
> + '[' true == true ']'
> + TARDIR_NAME=spark-3.1.0-SNAPSHOT-bin-custom-spark
> + TARDIR=/opt/spark/spark-3.1.0-SNAPSHOT-bin-custom-spark
> + rm -rf /opt/spark/spark-3.1.0-SNAPSHOT-bin-custom-spark
> + cp -r /opt/spark/dist /opt/spark/spark-3.1.0-SNAPSHOT-bin-custom-spark
> + tar czf spark-3.1.0-SNAPSHOT-bin-custom-spark.tgz -C /opt/spark
> spark-3.1.0-SNAPSHOT-bin-custom-spark
> + rm -rf /opt/spark/spark-3.1.0-SNAPSHOT-bin-custom-spark
> 
>
>
> On Wed, 24 Jun 2020, 11:07 Hyukjin Kwon,  wrote:
>
>> Looks like you haven't installed the 'e1071' package.
>>
>> 2020년 6월 24일 (수) 오후 6:49, Anwar AliKhan 님이 작성:
>>
>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr
>>> -Phive -Phive-thriftserver -Pmesos -Pyarn -Pkubernetes
>>> 
>>>
>>>
>>> minor error Spark r test failed , I don't use r so it doesn't effect me.
>>>
>>> ***installing help indices
>>> ** building package indices
>>> ** installing vignettes
>>> ** testing if installed package can be loaded from temporary location
>>> ** testing if installed package can be loaded from final location
>>> ** testing if installed package keeps a record of temporary installation
>>> path
>>> * DONE (SparkR)
>>> ++ cd /opt/spark/R/lib
>>> ++ jar cfM /opt/spark/R/lib/sparkr.zip SparkR
>>> ++ popd
>>> ++ cd /opt/spark/R/..
>>> ++ pwd
>>> + SPARK_HOME=/opt/spark
>>> + . /opt/spark/bin/load-spark-env.sh
>>> ++ '[' -z /opt/spark ']'
>>> ++ SPARK_ENV_SH=spark-env.sh
>>> ++ '[' -z '' ']'
>>> ++ export SPARK_ENV_LOADED=1
>>> ++ SPARK_ENV_LOADED=1
>>> ++ export SPARK_CONF_DIR=/opt/spark/conf
>>> ++ SPARK_CONF_DIR=/opt/spark/conf
>>> ++ SPARK_ENV_SH=/opt/spark/conf/spark-env.sh
>>> ++ [[ -f /opt/spark/conf/spark-env.sh ]]
>>> ++ set -a
>>> ++ . /opt/spark/conf/spark-env.sh
>>> +++ export SPARK_LOCAL_IP=192.168.0.786
>>> +++ SPARK_LOCAL_IP=192.168.0.786
>>> ++ set +a
>>> ++ export SPARK_SCALA_VERSION=2.12
>>> ++ SPARK_SCALA_VERSION=2.12
>>> + '[' -f /opt/spark/RELEASE ']'
>>> + SPARK_JARS_DIR=/opt/spark/assembly/target/scala-2.12/jars
>>> + '[' -d /opt/spark/assembly/target/scala-2.12/jars ']'
>>> + SPARK_HOME=/opt/spark
>>> + /usr/bin/R CMD build /opt/spark/R/pkg
>>> * checking for file ‘/opt/spark/R/pkg/DESCRIPTION’ ... OK
>>> * preparing ‘SparkR’:
>>> * checking DESCRIPTION meta-information ... OK
>>> * installing the package to build vignettes
>>> * creating vignettes ... ERROR
>>> --- re-building ‘sparkr-vignettes.Rmd’ using rmarkdown
>>>
>>> Attaching package: 'SparkR'
>>>
>>> The following objects are masked from 'package:stats':
>>>
>>> cov, filter, lag, na.omit, predict, sd, var, window
>>>
>>> The following objects are masked from 'package:base':
>>>
>>> as.data.frame, colnames, colnames<-, drop,